diff --git a/CHANGELOG.md b/CHANGELOG.md index c176b375..8a83b3d6 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -19,6 +19,7 @@ All notable changes to this project will be documented in this file. - `SparkApplication`: `spark-env.sh` and `security.properties` - `SparkHistoryServer`: `spark-defaults.conf`, `spark-env.sh` and `security.properties` - `SparkConnectServer`: `spark-defaults.conf`, `metrics.properties` and `security.properties` +- Internal operator refactoring: introduce dereference() and validate() steps in the reconciler for spark application, spark connect and spark history server([#687]). Previously, arbitrary file names were silently accepted and ignored ([#679]). - Bump `stackable-operator` to 0.110.1 ([#679]). @@ -27,6 +28,7 @@ All notable changes to this project will be documented in this file. [#679]: https://github.com/stackabletech/spark-k8s-operator/pull/679 [#680]: https://github.com/stackabletech/spark-k8s-operator/pull/680 [#684]: https://github.com/stackabletech/spark-k8s-operator/pull/684 +[#687]: https://github.com/stackabletech/spark-k8s-operator/pull/687 ## [26.3.0] - 2026-03-16 diff --git a/Cargo.nix b/Cargo.nix index b3bbcbf3..b7cf9ecb 100644 --- a/Cargo.nix +++ b/Cargo.nix @@ -4823,7 +4823,7 @@ rec { src = pkgs.fetchgit { url = "https://github.com/stackabletech/operator-rs.git"; rev = "7a5f0c3fbcd091340214a23f0607fcd4b4fcc152"; - sha256 = "0d58yvxvy8hbai12bjhcyvh4zw182j5dsfyqja4k2xc1vzjy29by"; + sha256 = "0lj969rjbxairjglrnaq0xhabvdrq5nd6wl1i0y9pr50nhh7zvgk"; }; libName = "k8s_version"; authors = [ @@ -9471,7 +9471,7 @@ rec { src = pkgs.fetchgit { url = "https://github.com/stackabletech/operator-rs.git"; rev = "7a5f0c3fbcd091340214a23f0607fcd4b4fcc152"; - sha256 = "0d58yvxvy8hbai12bjhcyvh4zw182j5dsfyqja4k2xc1vzjy29by"; + sha256 = "0lj969rjbxairjglrnaq0xhabvdrq5nd6wl1i0y9pr50nhh7zvgk"; }; libName = "stackable_certs"; authors = [ @@ -9574,7 +9574,7 @@ rec { src = pkgs.fetchgit { url = "https://github.com/stackabletech/operator-rs.git"; rev = "7a5f0c3fbcd091340214a23f0607fcd4b4fcc152"; - sha256 = "0d58yvxvy8hbai12bjhcyvh4zw182j5dsfyqja4k2xc1vzjy29by"; + sha256 = "0lj969rjbxairjglrnaq0xhabvdrq5nd6wl1i0y9pr50nhh7zvgk"; }; libName = "stackable_operator"; authors = [ @@ -9754,7 +9754,7 @@ rec { src = pkgs.fetchgit { url = "https://github.com/stackabletech/operator-rs.git"; rev = "7a5f0c3fbcd091340214a23f0607fcd4b4fcc152"; - sha256 = "0d58yvxvy8hbai12bjhcyvh4zw182j5dsfyqja4k2xc1vzjy29by"; + sha256 = "0lj969rjbxairjglrnaq0xhabvdrq5nd6wl1i0y9pr50nhh7zvgk"; }; procMacro = true; libName = "stackable_operator_derive"; @@ -9789,7 +9789,7 @@ rec { src = pkgs.fetchgit { url = "https://github.com/stackabletech/operator-rs.git"; rev = "7a5f0c3fbcd091340214a23f0607fcd4b4fcc152"; - sha256 = "0d58yvxvy8hbai12bjhcyvh4zw182j5dsfyqja4k2xc1vzjy29by"; + sha256 = "0lj969rjbxairjglrnaq0xhabvdrq5nd6wl1i0y9pr50nhh7zvgk"; }; libName = "stackable_shared"; authors = [ @@ -9980,7 +9980,7 @@ rec { src = pkgs.fetchgit { url = "https://github.com/stackabletech/operator-rs.git"; rev = "7a5f0c3fbcd091340214a23f0607fcd4b4fcc152"; - sha256 = "0d58yvxvy8hbai12bjhcyvh4zw182j5dsfyqja4k2xc1vzjy29by"; + sha256 = "0lj969rjbxairjglrnaq0xhabvdrq5nd6wl1i0y9pr50nhh7zvgk"; }; libName = "stackable_telemetry"; authors = [ @@ -10090,7 +10090,7 @@ rec { src = pkgs.fetchgit { url = "https://github.com/stackabletech/operator-rs.git"; rev = "7a5f0c3fbcd091340214a23f0607fcd4b4fcc152"; - sha256 = "0d58yvxvy8hbai12bjhcyvh4zw182j5dsfyqja4k2xc1vzjy29by"; + sha256 = "0lj969rjbxairjglrnaq0xhabvdrq5nd6wl1i0y9pr50nhh7zvgk"; }; libName = "stackable_versioned"; authors = [ @@ -10140,7 +10140,7 @@ rec { src = pkgs.fetchgit { url = "https://github.com/stackabletech/operator-rs.git"; rev = "7a5f0c3fbcd091340214a23f0607fcd4b4fcc152"; - sha256 = "0d58yvxvy8hbai12bjhcyvh4zw182j5dsfyqja4k2xc1vzjy29by"; + sha256 = "0lj969rjbxairjglrnaq0xhabvdrq5nd6wl1i0y9pr50nhh7zvgk"; }; procMacro = true; libName = "stackable_versioned_macros"; @@ -10208,7 +10208,7 @@ rec { src = pkgs.fetchgit { url = "https://github.com/stackabletech/operator-rs.git"; rev = "7a5f0c3fbcd091340214a23f0607fcd4b4fcc152"; - sha256 = "0d58yvxvy8hbai12bjhcyvh4zw182j5dsfyqja4k2xc1vzjy29by"; + sha256 = "0lj969rjbxairjglrnaq0xhabvdrq5nd6wl1i0y9pr50nhh7zvgk"; }; libName = "stackable_webhook"; authors = [ diff --git a/crate-hashes.json b/crate-hashes.json index a6396ca0..86f2b840 100644 --- a/crate-hashes.json +++ b/crate-hashes.json @@ -1,12 +1,12 @@ { - "git+https://github.com/stackabletech/operator-rs.git?tag=stackable-operator-0.111.1#k8s-version@0.1.3": "0d58yvxvy8hbai12bjhcyvh4zw182j5dsfyqja4k2xc1vzjy29by", - "git+https://github.com/stackabletech/operator-rs.git?tag=stackable-operator-0.111.1#stackable-certs@0.4.0": "0d58yvxvy8hbai12bjhcyvh4zw182j5dsfyqja4k2xc1vzjy29by", - "git+https://github.com/stackabletech/operator-rs.git?tag=stackable-operator-0.111.1#stackable-operator-derive@0.3.1": "0d58yvxvy8hbai12bjhcyvh4zw182j5dsfyqja4k2xc1vzjy29by", - "git+https://github.com/stackabletech/operator-rs.git?tag=stackable-operator-0.111.1#stackable-operator@0.111.1": "0d58yvxvy8hbai12bjhcyvh4zw182j5dsfyqja4k2xc1vzjy29by", - "git+https://github.com/stackabletech/operator-rs.git?tag=stackable-operator-0.111.1#stackable-shared@0.1.0": "0d58yvxvy8hbai12bjhcyvh4zw182j5dsfyqja4k2xc1vzjy29by", - "git+https://github.com/stackabletech/operator-rs.git?tag=stackable-operator-0.111.1#stackable-telemetry@0.6.3": "0d58yvxvy8hbai12bjhcyvh4zw182j5dsfyqja4k2xc1vzjy29by", - "git+https://github.com/stackabletech/operator-rs.git?tag=stackable-operator-0.111.1#stackable-versioned-macros@0.10.0": "0d58yvxvy8hbai12bjhcyvh4zw182j5dsfyqja4k2xc1vzjy29by", - "git+https://github.com/stackabletech/operator-rs.git?tag=stackable-operator-0.111.1#stackable-versioned@0.10.0": "0d58yvxvy8hbai12bjhcyvh4zw182j5dsfyqja4k2xc1vzjy29by", - "git+https://github.com/stackabletech/operator-rs.git?tag=stackable-operator-0.111.1#stackable-webhook@0.9.1": "0d58yvxvy8hbai12bjhcyvh4zw182j5dsfyqja4k2xc1vzjy29by", + "git+https://github.com/stackabletech/operator-rs.git?tag=stackable-operator-0.111.1#k8s-version@0.1.3": "0lj969rjbxairjglrnaq0xhabvdrq5nd6wl1i0y9pr50nhh7zvgk", + "git+https://github.com/stackabletech/operator-rs.git?tag=stackable-operator-0.111.1#stackable-certs@0.4.0": "0lj969rjbxairjglrnaq0xhabvdrq5nd6wl1i0y9pr50nhh7zvgk", + "git+https://github.com/stackabletech/operator-rs.git?tag=stackable-operator-0.111.1#stackable-operator-derive@0.3.1": "0lj969rjbxairjglrnaq0xhabvdrq5nd6wl1i0y9pr50nhh7zvgk", + "git+https://github.com/stackabletech/operator-rs.git?tag=stackable-operator-0.111.1#stackable-operator@0.111.1": "0lj969rjbxairjglrnaq0xhabvdrq5nd6wl1i0y9pr50nhh7zvgk", + "git+https://github.com/stackabletech/operator-rs.git?tag=stackable-operator-0.111.1#stackable-shared@0.1.0": "0lj969rjbxairjglrnaq0xhabvdrq5nd6wl1i0y9pr50nhh7zvgk", + "git+https://github.com/stackabletech/operator-rs.git?tag=stackable-operator-0.111.1#stackable-telemetry@0.6.3": "0lj969rjbxairjglrnaq0xhabvdrq5nd6wl1i0y9pr50nhh7zvgk", + "git+https://github.com/stackabletech/operator-rs.git?tag=stackable-operator-0.111.1#stackable-versioned-macros@0.10.0": "0lj969rjbxairjglrnaq0xhabvdrq5nd6wl1i0y9pr50nhh7zvgk", + "git+https://github.com/stackabletech/operator-rs.git?tag=stackable-operator-0.111.1#stackable-versioned@0.10.0": "0lj969rjbxairjglrnaq0xhabvdrq5nd6wl1i0y9pr50nhh7zvgk", + "git+https://github.com/stackabletech/operator-rs.git?tag=stackable-operator-0.111.1#stackable-webhook@0.9.1": "0lj969rjbxairjglrnaq0xhabvdrq5nd6wl1i0y9pr50nhh7zvgk", "git+https://github.com/stackabletech/product-config.git?tag=0.8.0#product-config@0.8.0": "1dz70kapm2wdqcr7ndyjji0lhsl98bsq95gnb2lw487wf6yr7987" } \ No newline at end of file diff --git a/rust/operator-binary/src/connect/controller.rs b/rust/operator-binary/src/connect/controller.rs index 28f2732e..6b578181 100644 --- a/rust/operator-binary/src/connect/controller.rs +++ b/rust/operator-binary/src/connect/controller.rs @@ -3,7 +3,7 @@ use std::sync::Arc; use snafu::{ResultExt, Snafu}; use stackable_operator::{ cluster_resources::{ClusterResourceApplyStrategy, ClusterResources}, - commons::{product_image_selection, rbac::build_rbac_resources}, + commons::rbac::build_rbac_resources, kube::{ Resource, ResourceExt, core::{DeserializeGuard, error_boundary}, @@ -21,10 +21,13 @@ use strum::{EnumDiscriminants, IntoStaticStr}; use super::crd::{CONNECT_APP_NAME, CONNECT_CONTROLLER_NAME, v1alpha1}; use crate::{ Ctx, - connect::{common, crd::SparkConnectServerStatus, executor, s3, server, service}, - crd::constants::{CONTAINER_IMAGE_BASE_NAME, OPERATOR_NAME}, + connect::{common, crd::SparkConnectServerStatus, executor, server, service}, + crd::constants::OPERATOR_NAME, }; +pub mod dereference; +pub mod validate; + #[derive(Snafu, Debug, EnumDiscriminants)] #[strum_discriminants(derive(IntoStaticStr))] #[allow(clippy::enum_variant_names)] @@ -124,11 +127,6 @@ pub enum Error { BuildRbacResources { source: stackable_operator::commons::rbac::Error, }, - #[snafu(display("failed to build connect server configuration"))] - ServerConfig { source: crate::connect::crd::Error }, - - #[snafu(display("failed to build connect executor configuration"))] - ExecutorConfig { source: crate::connect::crd::Error }, #[snafu(display("failed to build connect executor pod template"))] ExecutorPodTemplate { @@ -138,16 +136,14 @@ pub enum Error { #[snafu(display("failed to serialize executor pod template"))] ExecutorPodTemplateSerde { source: serde_yaml::Error }, - #[snafu(display("failed to resolve product image"))] - ResolveProductImage { - source: product_image_selection::Error, - }, - - #[snafu(display("failed to resolve S3 connections for SparkConnectServer {name:?}"))] - ResolveS3Connections { source: s3::Error, name: String }, - #[snafu(display("failed to build connect server S3 properties"))] S3SparkProperties { source: crate::connect::s3::Error }, + + #[snafu(display("failed to dereference SparkConnectServer"))] + DereferenceSparkConnectServer { source: dereference::Error }, + + #[snafu(display("failed to validate SparkConnectServer"))] + ValidateSparkConnectServer { source: validate::Error }, } type Result = std::result::Result; @@ -170,12 +166,22 @@ pub async fn reconcile( .map_err(error_boundary::InvalidObject::clone) .context(InvalidSparkConnectServerSnafu)?; - let server_config = scs.server_config().context(ServerConfigSnafu)?; - let server_role_config = &scs.spec.server.role_config; - let executor_config = scs.executor_config().context(ExecutorConfigSnafu)?; - let client = &ctx.client; + let dereferenced = dereference::dereference(client, scs) + .await + .context(DereferenceSparkConnectServerSnafu)?; + + let validated = validate::validate(scs, dereferenced, &ctx.operator_environment) + .context(ValidateSparkConnectServerSnafu)?; + + let server_config = &validated.server_config; + let executor_config = &validated.executor_config; + let resolved_product_image = &validated.resolved_product_image; + let resolved_s3 = &validated.dereferenced.resolved_s3; + + let server_role_config = &scs.spec.server.role_config; + let mut cluster_resources = ClusterResources::new( CONNECT_APP_NAME, OPERATOR_NAME, @@ -186,23 +192,6 @@ pub async fn reconcile( ) .context(CreateClusterResourcesSnafu)?; - let resolved_product_image = scs - .spec - .image - .resolve( - CONTAINER_IMAGE_BASE_NAME, - &ctx.operator_environment.image_repository, - crate::built_info::PKG_VERSION, - ) - .context(ResolveProductImageSnafu)?; - - // Resolve any S3 connections early to fail fast if there are issues. - let resolved_s3 = s3::ResolvedS3::resolve(client, scs) - .await - .with_context(|_| ResolveS3ConnectionsSnafu { - name: scs.name_unchecked(), - })?; - // Use a dedicated service account for connect server pods. let (service_account, role_binding) = build_rbac_resources( scs, @@ -251,13 +240,13 @@ pub async fn reconcile( .context(S3SparkPropertiesSnafu)?, server::server_properties( scs, - &server_config, + server_config, &applied_headless_service, &service_account, - &resolved_product_image, + resolved_product_image, ) .context(ServerPropertiesSnafu)?, - executor::executor_properties(scs, &executor_config, &resolved_product_image) + executor::executor_properties(scs, executor_config, resolved_product_image) .context(ExecutorPropertiesSnafu)?, ]) .context(SerializePropertiesSnafu)?; @@ -265,7 +254,7 @@ pub async fn reconcile( // ======================================== // Executor config map and pod template let executor_config_map = - executor::executor_config_map(scs, &executor_config, &resolved_product_image).context( + executor::executor_config_map(scs, executor_config, resolved_product_image).context( BuildExecutorConfigMapSnafu { name: scs.name_unchecked(), }, @@ -280,10 +269,10 @@ pub async fn reconcile( let executor_pod_template = serde_yaml::to_string( &executor::executor_pod_template( scs, - &executor_config, - &resolved_product_image, + executor_config, + resolved_product_image, &executor_config_map, - &resolved_s3, + resolved_s3, ) .context(ExecutorPodTemplateSnafu)?, ) @@ -293,8 +282,8 @@ pub async fn reconcile( // Server config map let server_config_map = server::server_config_map( scs, - &server_config, - &resolved_product_image, + server_config, + resolved_product_image, &spark_props, &executor_pod_template, ) @@ -310,7 +299,7 @@ pub async fn reconcile( // ======================================== // Server listener - let listener = server::build_listener(scs, server_role_config, &resolved_product_image) + let listener = server::build_listener(scs, server_role_config, resolved_product_image) .context(BuildListenerSnafu)?; let applied_listener = cluster_resources @@ -323,13 +312,13 @@ pub async fn reconcile( let args = server::command_args(&scs.spec.args); let stateful_set = server::build_stateful_set( scs, - &server_config, - &resolved_product_image, + server_config, + resolved_product_image, &service_account, &server_config_map, &applied_listener.name_any(), args, - &resolved_s3, + resolved_s3, ) .context(BuildServerStatefulSetSnafu)?; diff --git a/rust/operator-binary/src/connect/controller/dereference.rs b/rust/operator-binary/src/connect/controller/dereference.rs new file mode 100644 index 00000000..32f99414 --- /dev/null +++ b/rust/operator-binary/src/connect/controller/dereference.rs @@ -0,0 +1,33 @@ +//! The dereference step in the SparkConnectServer controller. +//! +//! Fetches the resolved S3 configuration referenced by the SparkConnectServer spec. + +use snafu::{ResultExt, Snafu}; +use stackable_operator::{client::Client, kube::ResourceExt}; + +use crate::connect::{crd::v1alpha1, s3}; + +#[derive(Snafu, Debug)] +pub enum Error { + #[snafu(display("failed to resolve S3 connection for {name}"))] + ResolveS3Connections { source: s3::Error, name: String }, +} + +type Result = std::result::Result; + +pub struct DereferencedSparkConnectServer { + pub resolved_s3: s3::ResolvedS3, +} + +pub async fn dereference( + client: &Client, + scs: &v1alpha1::SparkConnectServer, +) -> Result { + let resolved_s3 = s3::ResolvedS3::resolve(client, scs) + .await + .with_context(|_| ResolveS3ConnectionsSnafu { + name: scs.name_unchecked(), + })?; + + Ok(DereferencedSparkConnectServer { resolved_s3 }) +} diff --git a/rust/operator-binary/src/connect/controller/validate.rs b/rust/operator-binary/src/connect/controller/validate.rs new file mode 100644 index 00000000..c009316d --- /dev/null +++ b/rust/operator-binary/src/connect/controller/validate.rs @@ -0,0 +1,66 @@ +//! The validate step in the SparkConnectServer controller. +//! +//! Resolves the product image and the server/executor configs. Does not touch K8s. + +use snafu::{ResultExt, Snafu}; +use stackable_operator::{ + cli::OperatorEnvironmentOptions, + commons::product_image_selection::{self, ResolvedProductImage}, +}; + +use crate::{ + connect::{ + controller::dereference::DereferencedSparkConnectServer, + crd::{self, v1alpha1}, + }, + crd::constants::CONTAINER_IMAGE_BASE_NAME, +}; + +#[derive(Snafu, Debug)] +pub enum Error { + #[snafu(display("failed to resolve product image"))] + ResolveProductImage { + source: product_image_selection::Error, + }, + + #[snafu(display("failed to resolve server config"))] + ServerConfig { source: crd::Error }, + + #[snafu(display("failed to resolve executor config"))] + ExecutorConfig { source: crd::Error }, +} + +type Result = std::result::Result; + +pub struct ValidatedSparkConnectServer { + pub dereferenced: DereferencedSparkConnectServer, + pub resolved_product_image: ResolvedProductImage, + pub server_config: v1alpha1::ServerConfig, + pub executor_config: v1alpha1::ExecutorConfig, +} + +pub fn validate( + scs: &v1alpha1::SparkConnectServer, + dereferenced: DereferencedSparkConnectServer, + operator_environment: &OperatorEnvironmentOptions, +) -> Result { + let resolved_product_image = scs + .spec + .image + .resolve( + CONTAINER_IMAGE_BASE_NAME, + &operator_environment.image_repository, + crate::built_info::PKG_VERSION, + ) + .context(ResolveProductImageSnafu)?; + + let server_config = scs.server_config().context(ServerConfigSnafu)?; + let executor_config = scs.executor_config().context(ExecutorConfigSnafu)?; + + Ok(ValidatedSparkConnectServer { + dereferenced, + resolved_product_image, + server_config, + executor_config, + }) +} diff --git a/rust/operator-binary/src/crd/logdir.rs b/rust/operator-binary/src/crd/logdir.rs index 65f2e936..75b3c3c3 100644 --- a/rust/operator-binary/src/crd/logdir.rs +++ b/rust/operator-binary/src/crd/logdir.rs @@ -26,15 +26,9 @@ use crate::crd::{ #[strum_discriminants(derive(IntoStaticStr))] #[allow(clippy::enum_variant_names)] pub enum Error { - #[snafu(display("missing bucket name for history logs"))] - BucketNameMissing, - #[snafu(display("tls non-verification not supported"))] S3TlsNoVerificationNotSupported, - #[snafu(display("ca-cert verification not supported"))] - S3TlsCaVerificationNotSupported, - #[snafu(display("failed to build TLS certificate SecretClass Volume"))] TlsCertSecretClassVolumeBuild { source: SecretOperatorVolumeSourceBuilderError, diff --git a/rust/operator-binary/src/history/history_controller.rs b/rust/operator-binary/src/history/controller.rs similarity index 92% rename from rust/operator-binary/src/history/history_controller.rs rename to rust/operator-binary/src/history/controller.rs index a5401fd8..2fcdfe5a 100644 --- a/rust/operator-binary/src/history/history_controller.rs +++ b/rust/operator-binary/src/history/controller.rs @@ -21,10 +21,7 @@ use stackable_operator::{ }, }, cluster_resources::{ClusterResourceApplyStrategy, ClusterResources}, - commons::{ - product_image_selection::{self, ResolvedProductImage}, - rbac::build_rbac_resources, - }, + commons::{product_image_selection::ResolvedProductImage, rbac::build_rbac_resources}, crd::listener, k8s_openapi::{ DeepMerge, @@ -57,13 +54,12 @@ use crate::{ Ctx, crd::{ constants::{ - ACCESS_KEY_ID, CONTAINER_IMAGE_BASE_NAME, HISTORY_APP_NAME, HISTORY_CONTROLLER_NAME, - HISTORY_UI_PORT, JVM_SECURITY_PROPERTIES_FILE, LISTENER_VOLUME_DIR, - LISTENER_VOLUME_NAME, MAX_SPARK_LOG_FILES_SIZE, METRICS_PORT, OPERATOR_NAME, - SECRET_ACCESS_KEY, SPARK_DEFAULTS_FILE_NAME, SPARK_ENV_SH_FILE_NAME, - STACKABLE_TRUST_STORE, VOLUME_MOUNT_NAME_CONFIG, VOLUME_MOUNT_NAME_LOG, - VOLUME_MOUNT_NAME_LOG_CONFIG, VOLUME_MOUNT_PATH_CONFIG, VOLUME_MOUNT_PATH_LOG, - VOLUME_MOUNT_PATH_LOG_CONFIG, + ACCESS_KEY_ID, HISTORY_APP_NAME, HISTORY_CONTROLLER_NAME, HISTORY_UI_PORT, + JVM_SECURITY_PROPERTIES_FILE, LISTENER_VOLUME_DIR, LISTENER_VOLUME_NAME, + MAX_SPARK_LOG_FILES_SIZE, METRICS_PORT, OPERATOR_NAME, SECRET_ACCESS_KEY, + SPARK_DEFAULTS_FILE_NAME, SPARK_ENV_SH_FILE_NAME, STACKABLE_TRUST_STORE, + VOLUME_MOUNT_NAME_CONFIG, VOLUME_MOUNT_NAME_LOG, VOLUME_MOUNT_NAME_LOG_CONFIG, + VOLUME_MOUNT_PATH_CONFIG, VOLUME_MOUNT_PATH_LOG, VOLUME_MOUNT_PATH_LOG_CONFIG, }, history::{self, HistoryConfig, SparkHistoryServerContainer, v1alpha1}, listener_ext, @@ -78,6 +74,9 @@ use crate::{ product_logging::{self}, }; +pub mod dereference; +pub mod validate; + #[derive(Snafu, Debug, EnumDiscriminants)] #[strum_discriminants(derive(IntoStaticStr))] #[allow(clippy::enum_variant_names)] @@ -141,8 +140,11 @@ pub enum Error { source: stackable_operator::cluster_resources::Error, }, - #[snafu(display("product config validation failed"))] - ProductConfigValidation { source: crate::crd::history::Error }, + #[snafu(display("failed to dereference SparkHistoryServer"))] + DereferenceSparkHistoryServer { source: dereference::Error }, + + #[snafu(display("failed to validate SparkHistoryServer"))] + ValidateSparkHistoryServer { source: validate::Error }, #[snafu(display("failed to resolve and merge config for role and role group"))] FailedToResolveConfig { source: crate::crd::history::Error }, @@ -235,12 +237,7 @@ pub enum Error { source: stackable_operator::cluster_resources::Error, }, - #[snafu(display("failed to resolve product image"))] - ResolveProductImage { - source: product_image_selection::Error, - }, - - #[snafu(display("failed to resolve product image"))] + #[snafu(display("failed to build metrics service"))] BuildMetricsService { source: service::Error }, } @@ -264,6 +261,18 @@ pub async fn reconcile( let client = &ctx.client; + let dereferenced = dereference::dereference(client, shs) + .await + .context(DereferenceSparkHistoryServerSnafu)?; + + let validated = validate::validate( + shs, + dereferenced, + &ctx.operator_environment, + &ctx.product_config, + ) + .context(ValidateSparkHistoryServerSnafu)?; + let mut cluster_resources = ClusterResources::new( HISTORY_APP_NAME, OPERATOR_NAME, @@ -274,22 +283,8 @@ pub async fn reconcile( ) .context(CreateClusterResourcesSnafu)?; - let resolved_product_image = shs - .spec - .image - .resolve( - CONTAINER_IMAGE_BASE_NAME, - &ctx.operator_environment.image_repository, - crate::built_info::PKG_VERSION, - ) - .context(ResolveProductImageSnafu)?; - let log_dir = ResolvedLogDir::resolve( - &shs.spec.log_file_directory, - shs.metadata.namespace.clone(), - client, - ) - .await - .context(LogDirSnafu)?; + let resolved_product_image = &validated.resolved_product_image; + let log_dir = &validated.dereferenced.log_dir; // Use a dedicated service account for history server pods. let (service_account, role_binding) = build_rbac_resources( @@ -310,11 +305,7 @@ pub async fn reconcile( .context(ApplyRoleBindingSnafu)?; // The role_name is always HISTORY_ROLE_NAME - for (role_name, role_config) in shs - .validated_role_config(&resolved_product_image, &ctx.product_config) - .context(ProductConfigValidationSnafu)? - .iter() - { + for (role_name, role_config) in validated.product_config.iter() { for (rolegroup_name, rolegroup_config) in role_config.iter() { let rgr = RoleGroupRef { cluster: ObjectRef::from_obj(shs), @@ -332,18 +323,18 @@ pub async fn reconcile( &merged_config, &resolved_product_image.app_version_label_value, &rgr, - &log_dir, + log_dir, )?; let metrics_service = - build_rolegroup_metrics_service(shs, &resolved_product_image, &rgr) + build_rolegroup_metrics_service(shs, resolved_product_image, &rgr) .context(BuildMetricsServiceSnafu)?; let sts = build_stateful_set( shs, - &resolved_product_image, + resolved_product_image, &rgr, - &log_dir, + log_dir, &merged_config, &service_account, )?; @@ -364,7 +355,7 @@ pub async fn reconcile( let rg_group_listener = build_group_listener( shs, - &resolved_product_image, + resolved_product_image, role_name, shs.node_listener_class().to_string(), )?; diff --git a/rust/operator-binary/src/history/controller/dereference.rs b/rust/operator-binary/src/history/controller/dereference.rs new file mode 100644 index 00000000..046f1d55 --- /dev/null +++ b/rust/operator-binary/src/history/controller/dereference.rs @@ -0,0 +1,37 @@ +//! The dereference step in the SparkHistoryServer controller. +//! +//! Fetches the resolved log directory (including the underlying S3 connection/bucket and +//! TLS secret if any) and returns it in [`DereferencedSparkHistoryServer`]. + +use snafu::{ResultExt, Snafu}; +use stackable_operator::client::Client; + +use crate::crd::{history::v1alpha1, logdir::ResolvedLogDir}; + +#[derive(Snafu, Debug)] +pub enum Error { + #[snafu(display("failed to resolve log directory"))] + LogDir { source: crate::crd::logdir::Error }, +} + +type Result = std::result::Result; + +/// Kubernetes objects referenced from a SparkHistoryServer, already fetched. +pub struct DereferencedSparkHistoryServer { + pub log_dir: ResolvedLogDir, +} + +pub async fn dereference( + client: &Client, + shs: &v1alpha1::SparkHistoryServer, +) -> Result { + let log_dir = ResolvedLogDir::resolve( + &shs.spec.log_file_directory, + shs.metadata.namespace.clone(), + client, + ) + .await + .context(LogDirSnafu)?; + + Ok(DereferencedSparkHistoryServer { log_dir }) +} diff --git a/rust/operator-binary/src/history/controller/validate.rs b/rust/operator-binary/src/history/controller/validate.rs new file mode 100644 index 00000000..351b6f83 --- /dev/null +++ b/rust/operator-binary/src/history/controller/validate.rs @@ -0,0 +1,63 @@ +//! The validate step in the SparkHistoryServer controller. +//! +//! Resolves the product image and runs role/role-group config validation. +//! Does not touch the Kubernetes API. + +use product_config::ProductConfigManager; +use snafu::{ResultExt, Snafu}; +use stackable_operator::{ + cli::OperatorEnvironmentOptions, + commons::product_image_selection::{self, ResolvedProductImage}, + product_config_utils::ValidatedRoleConfigByPropertyKind, +}; + +use crate::{ + crd::{constants::CONTAINER_IMAGE_BASE_NAME, history::v1alpha1}, + history::controller::dereference::DereferencedSparkHistoryServer, +}; + +#[derive(Snafu, Debug)] +pub enum Error { + #[snafu(display("failed to resolve product image"))] + ResolveProductImage { + source: product_image_selection::Error, + }, + + #[snafu(display("invalid product config"))] + InvalidProductConfig { source: crate::crd::history::Error }, +} + +type Result = std::result::Result; + +pub struct ValidatedSparkHistoryServer { + pub dereferenced: DereferencedSparkHistoryServer, + pub resolved_product_image: ResolvedProductImage, + pub product_config: ValidatedRoleConfigByPropertyKind, +} + +pub fn validate( + shs: &v1alpha1::SparkHistoryServer, + dereferenced: DereferencedSparkHistoryServer, + operator_environment: &OperatorEnvironmentOptions, + product_config: &ProductConfigManager, +) -> Result { + let resolved_product_image = shs + .spec + .image + .resolve( + CONTAINER_IMAGE_BASE_NAME, + &operator_environment.image_repository, + crate::built_info::PKG_VERSION, + ) + .context(ResolveProductImageSnafu)?; + + let product_config_validated = shs + .validated_role_config(&resolved_product_image, product_config) + .context(InvalidProductConfigSnafu)?; + + Ok(ValidatedSparkHistoryServer { + dereferenced, + resolved_product_image, + product_config: product_config_validated, + }) +} diff --git a/rust/operator-binary/src/history/mod.rs b/rust/operator-binary/src/history/mod.rs index f0f0bf2b..a8b727c6 100644 --- a/rust/operator-binary/src/history/mod.rs +++ b/rust/operator-binary/src/history/mod.rs @@ -5,7 +5,7 @@ use crate::crd::constants::{ }; pub mod config; -pub mod history_controller; +pub mod controller; pub mod operations; pub mod service; diff --git a/rust/operator-binary/src/main.rs b/rust/operator-binary/src/main.rs index 5f6024fb..6ae07183 100644 --- a/rust/operator-binary/src/main.rs +++ b/rust/operator-binary/src/main.rs @@ -7,7 +7,7 @@ use anyhow::anyhow; use clap::Parser; use connect::crd::{CONNECT_FULL_CONTROLLER_NAME, SparkConnectServer}; use futures::{FutureExt, StreamExt, TryFutureExt}; -use history::history_controller; +use history::controller; use product_config::ProductConfigManager; use stackable_operator::{ YamlSchema, @@ -272,8 +272,8 @@ async fn main() -> anyhow::Result<()> { ) .graceful_shutdown_on(sigterm_watcher.handle()) .run( - history_controller::reconcile, - history_controller::error_policy, + controller::reconcile, + controller::error_policy, Arc::new(ctx), ) .instrument(info_span!("history_controller")) diff --git a/rust/operator-binary/src/spark_k8s_controller.rs b/rust/operator-binary/src/spark_k8s_controller.rs index 7bc059ea..ca95acdb 100644 --- a/rust/operator-binary/src/spark_k8s_controller.rs +++ b/rust/operator-binary/src/spark_k8s_controller.rs @@ -16,10 +16,7 @@ use stackable_operator::{ volume::VolumeBuilder, }, }, - commons::{ - product_image_selection::{self, ResolvedProductImage}, - tls_verification::{CaCert, TlsVerification}, - }, + commons::product_image_selection::ResolvedProductImage, crd::s3, k8s_openapi::{ DeepMerge, Resource, @@ -39,7 +36,6 @@ use stackable_operator::{ }, kvp::Label, logging::controller::ReconcilerError, - product_config_utils::ValidatedRoleConfigByPropertyKind, product_logging::{ framework::{ LoggingError, capture_shell_output, create_vector_shutdown_file_command, @@ -66,21 +62,22 @@ use crate::{ product_logging::{self}, }; +pub mod dereference; +pub mod validate; + #[derive(Snafu, Debug, EnumDiscriminants)] #[strum_discriminants(derive(IntoStaticStr))] #[allow(clippy::enum_variant_names)] pub enum Error { - #[snafu(display("failed to merge application templates"))] - MergeApplicationTemplates { - source: crate::crd::template_spec::Error, - }, + #[snafu(display("failed to dereference SparkApplication"))] + DereferenceSparkApplication { source: dereference::Error }, + + #[snafu(display("failed to validate SparkApplication"))] + ValidateSparkApplication { source: validate::Error }, #[snafu(display("missing secret lifetime"))] MissingSecretLifetime, - #[snafu(display("object has no namespace"))] - ObjectHasNoNamespace, - #[snafu(display("object is missing metadata to build owner reference"))] ObjectMissingMetadataForOwnerRef { source: stackable_operator::builder::meta::Error, @@ -112,36 +109,14 @@ pub enum Error { #[snafu(display("pod template serialization"))] PodTemplateSerde { source: serde_yaml::Error }, - #[snafu(display("failed to configure S3 bucket"))] - ConfigureS3Bucket { - source: stackable_operator::crd::s3::v1alpha1::BucketError, - }, - - #[snafu(display("failed to configure S3 connection"))] - ConfigureS3Connection { - source: stackable_operator::crd::s3::v1alpha1::ConnectionError, - }, - - #[snafu(display("tls non-verification not supported"))] - S3TlsNoVerificationNotSupported, - - #[snafu(display("ca-cert verification not supported"))] - S3TlsCaVerificationNotSupported, - #[snafu(display("failed to resolve and merge config"))] FailedToResolveConfig { source: crate::crd::Error }, - #[snafu(display("failed to recognise the container name"))] - UnrecognisedContainerName, - #[snafu(display("illegal container name"))] IllegalContainerName { source: stackable_operator::builder::pod::container::Error, }, - #[snafu(display("failed to resolve the log dir configuration"))] - LogDir { source: crate::crd::logdir::Error }, - #[snafu(display("vector agent is enabled but vector aggregator ConfigMap is missing"))] VectorAggregatorConfigMapMissing, @@ -160,14 +135,6 @@ pub enum Error { role: SparkApplicationRole, }, - #[snafu(display("failed to generate product config"))] - GenerateProductConfig { - source: stackable_operator::product_config_utils::Error, - }, - - #[snafu(display("invalid product config"))] - InvalidProductConfig { source: crate::crd::Error }, - #[snafu(display("invalid submit config"))] SubmitConfig { source: crate::crd::Error }, @@ -181,12 +148,6 @@ pub enum Error { source: stackable_operator::builder::meta::Error, }, - #[snafu(display("failed to get required Labels"))] - GetRequiredLabels { - source: - stackable_operator::kvp::KeyValuePairError, - }, - #[snafu(display("failed to create Volumes for SparkApplication"))] CreateVolumes { source: crate::crd::Error }, @@ -208,11 +169,6 @@ pub enum Error { InvalidSparkApplication { source: error_boundary::InvalidObject, }, - - #[snafu(display("failed to resolve product image"))] - ResolveProductImage { - source: product_image_selection::Error, - }, } type Result = std::result::Result; @@ -247,78 +203,26 @@ pub async fn reconcile( // It is important to do this at the top of the reconciliation function to ensure // all referenced resources and configuration are merged before any of them are created. - let merged_template_result = - &crate::crd::template_spec::merge_application_templates(client, spark_application) - .await - .context(MergeApplicationTemplatesSnafu)?; - let spark_application = match &merged_template_result.app { - Some(app) => app, - None => spark_application, - }; + let dereferenced = dereference::dereference(client, spark_application) + .await + .context(DereferenceSparkApplicationSnafu)?; + + let validated = + validate::validate(dereferenced, &ctx.operator_environment, &ctx.product_config) + .context(ValidateSparkApplicationSnafu)?; + + let spark_application = &validated.dereferenced.spark_application; + let opt_s3conn = &validated.dereferenced.s3_connection; + let logdir = &validated.dereferenced.log_dir; + let resolved_product_image = &validated.resolved_product_image; + let validated_product_config = &validated.product_config; // This is the final version of the spark app to reconcile. // No more mutating operations after this point (except for status). tracing::debug!("reconciling spark application [{spark_application:?}]"); - let opt_s3conn = match spark_application.spec.s3connection.as_ref() { - Some(s3bd) => Some( - s3bd.clone() - .resolve( - client, - // TODO (@NickLarsenNZ): Explain this unwrap. Either convert to expect, or gracefully handle the error. - spark_application.metadata.namespace.as_deref().unwrap(), - ) - .await - .context(ConfigureS3ConnectionSnafu)?, - ), - _ => None, - }; - - // check early for valid verification options - if let Some(conn) = opt_s3conn.as_ref() { - if let Some(tls) = &conn.tls.tls { - match &tls.verification { - TlsVerification::None {} => return S3TlsNoVerificationNotSupportedSnafu.fail(), - TlsVerification::Server(server_verification) => { - match &server_verification.ca_cert { - CaCert::WebPki {} => {} - CaCert::SecretClass(_) => {} - } - } - } - } - } - - let logdir = if let Some(log_file_dir) = &spark_application.spec.log_file_directory { - Some( - ResolvedLogDir::resolve( - log_file_dir, - spark_application.metadata.namespace.clone(), - client, - ) - .await - .context(LogDirSnafu)?, - ) - } else { - None - }; - - let resolved_product_image = spark_application - .spec - .spark_image - .resolve( - CONTAINER_IMAGE_BASE_NAME, - &ctx.operator_environment.image_repository, - crate::built_info::PKG_VERSION, - ) - .context(ResolveProductImageSnafu)?; - - let validated_product_config: ValidatedRoleConfigByPropertyKind = spark_application - .validated_role_config(&resolved_product_image, &ctx.product_config) - .context(InvalidProductConfigSnafu)?; - let (serviceaccount, rolebinding) = - build_spark_role_serviceaccount(spark_application, &resolved_product_image)?; + build_spark_role_serviceaccount(spark_application, resolved_product_image)?; client .apply_patch(SPARK_CONTROLLER_NAME, &serviceaccount, &serviceaccount) .await @@ -328,7 +232,7 @@ pub async fn reconcile( .await .context(ApplyRoleBindingSnafu)?; - let env_vars = spark_application.env(&opt_s3conn, &logdir); + let env_vars = spark_application.env(opt_s3conn, logdir); let driver_config = spark_application .driver_config() @@ -345,9 +249,9 @@ pub async fn reconcile( &driver_config, driver_product_config, &env_vars, - &opt_s3conn, - &logdir, - &resolved_product_image, + opt_s3conn, + logdir, + resolved_product_image, &serviceaccount, )?; client @@ -374,9 +278,9 @@ pub async fn reconcile( &executor_config, executor_product_config, &env_vars, - &opt_s3conn, - &logdir, - &resolved_product_image, + opt_s3conn, + logdir, + resolved_product_image, &serviceaccount, )?; client @@ -389,7 +293,7 @@ pub async fn reconcile( .context(ApplyApplicationSnafu)?; let job_commands = spark_application - .build_command(&opt_s3conn, &logdir, &resolved_product_image.image) + .build_command(opt_s3conn, logdir, &resolved_product_image.image) .context(BuildCommandSnafu)?; let submit_config = spark_application @@ -404,7 +308,7 @@ pub async fn reconcile( let submit_job_config_map = submit_job_config_map( spark_application, submit_product_config, - &resolved_product_image, + resolved_product_image, )?; client .apply_patch( @@ -417,12 +321,12 @@ pub async fn reconcile( let job = spark_job( spark_application, - &resolved_product_image, + resolved_product_image, &serviceaccount, &env_vars, &job_commands, - &opt_s3conn, - &logdir, + opt_s3conn, + logdir, &submit_config, )?; client @@ -439,7 +343,7 @@ pub async fn reconcile( spark_application, &v1alpha1::SparkApplicationStatus { phase: "Unknown".to_string(), - resolved_template_ref: merged_template_result.resolved_template_ref.clone(), + resolved_template_ref: validated.dereferenced.resolved_template_refs.clone(), }, ) .await diff --git a/rust/operator-binary/src/spark_k8s_controller/dereference.rs b/rust/operator-binary/src/spark_k8s_controller/dereference.rs new file mode 100644 index 00000000..48ad20dd --- /dev/null +++ b/rust/operator-binary/src/spark_k8s_controller/dereference.rs @@ -0,0 +1,115 @@ +//! The dereference step in the SparkApplication controller. +//! +//! Fetches all Kubernetes objects referenced by the SparkApplication spec (templates, S3 +//! connection, log directory) and returns them in [`DereferencedSparkApplication`]. +//! Synchronous validation belongs in the sibling [`super::validate`] module. + +use snafu::{ResultExt, Snafu}; +use stackable_operator::{ + client::Client, + commons::tls_verification::{CaCert, TlsVerification}, + crd::s3, +}; + +use crate::crd::{ + logdir::ResolvedLogDir, + template_spec::{self}, + v1alpha1, +}; + +#[derive(Snafu, Debug)] +pub enum Error { + #[snafu(display("failed to merge application templates"))] + MergeApplicationTemplates { source: template_spec::Error }, + + #[snafu(display("failed to configure S3 connection"))] + ConfigureS3Connection { + source: stackable_operator::crd::s3::v1alpha1::ConnectionError, + }, + + #[snafu(display("S3 TLS verification with no verification is not supported"))] + S3TlsNoVerificationNotSupported, + + #[snafu(display("failed to resolve log directory"))] + LogDir { source: crate::crd::logdir::Error }, + + #[snafu(display("object has no namespace"))] + ObjectHasNoNamespace, +} + +type Result = std::result::Result; + +/// Kubernetes objects referenced from a SparkApplication, already fetched. +pub struct DereferencedSparkApplication { + /// SparkApplication after merging any referenced templates. + pub spark_application: v1alpha1::SparkApplication, + /// Resolved template references for status reporting. + pub resolved_template_refs: Vec, + /// Resolved S3 connection, if `spec.s3connection` is set. + pub s3_connection: Option, + /// Resolved log directory, if `spec.log_file_directory` is set. + pub log_dir: Option, +} + +/// Fetches all Kubernetes objects referenced from the given SparkApplication. +pub async fn dereference( + client: &Client, + spark_application: &v1alpha1::SparkApplication, +) -> Result { + // 1. Template merging — must happen first so subsequent lookups see the merged spec. + let merged = template_spec::merge_application_templates(client, spark_application) + .await + .context(MergeApplicationTemplatesSnafu)?; + let merged_app = merged.app.unwrap_or_else(|| spark_application.clone()); + let resolved_template_refs = merged.resolved_template_ref; + + let namespace = merged_app + .metadata + .namespace + .as_deref() + .ok_or(Error::ObjectHasNoNamespace)?; + + // 2. S3 connection. + let s3_connection = match merged_app.spec.s3connection.as_ref() { + Some(s3bd) => Some( + s3bd.clone() + .resolve(client, namespace) + .await + .context(ConfigureS3ConnectionSnafu)?, + ), + None => None, + }; + + // Early "no verification" rejection — preserves today's behavior (was inline at + // spark_k8s_controller.rs:278–290 before this refactor). + if let Some(conn) = s3_connection.as_ref() { + if let Some(tls) = &conn.tls.tls { + match &tls.verification { + TlsVerification::None {} => return S3TlsNoVerificationNotSupportedSnafu.fail(), + TlsVerification::Server(server_verification) => { + match &server_verification.ca_cert { + CaCert::WebPki {} => {} + CaCert::SecretClass(_) => {} + } + } + } + } + } + + // 3. Log directory (also pulls S3Bucket + TLS secret internally). + let log_dir = match merged_app.spec.log_file_directory.as_ref() { + Some(log_file_dir) => Some( + ResolvedLogDir::resolve(log_file_dir, merged_app.metadata.namespace.clone(), client) + .await + .context(LogDirSnafu)?, + ), + None => None, + }; + + Ok(DereferencedSparkApplication { + spark_application: merged_app, + resolved_template_refs, + s3_connection, + log_dir, + }) +} diff --git a/rust/operator-binary/src/spark_k8s_controller/validate.rs b/rust/operator-binary/src/spark_k8s_controller/validate.rs new file mode 100644 index 00000000..48c61a9a --- /dev/null +++ b/rust/operator-binary/src/spark_k8s_controller/validate.rs @@ -0,0 +1,65 @@ +//! The validate step in the SparkApplication controller. +//! +//! Synchronously validates the [`super::dereference::DereferencedSparkApplication`] and +//! resolves the product image and product config. Does not touch the Kubernetes API. + +use product_config::ProductConfigManager; +use snafu::{ResultExt, Snafu}; +use stackable_operator::{ + cli::OperatorEnvironmentOptions, + commons::product_image_selection::{self, ResolvedProductImage}, + product_config_utils::ValidatedRoleConfigByPropertyKind, +}; + +use crate::{ + crd::constants::CONTAINER_IMAGE_BASE_NAME, + spark_k8s_controller::dereference::DereferencedSparkApplication, +}; + +#[derive(Snafu, Debug)] +pub enum Error { + #[snafu(display("failed to resolve product image"))] + ResolveProductImage { + source: product_image_selection::Error, + }, + + #[snafu(display("invalid product config"))] + InvalidProductConfig { source: crate::crd::Error }, +} + +type Result = std::result::Result; + +/// Inputs the rest of `reconcile` needs after dereferencing. +pub struct ValidatedSparkApplication { + pub dereferenced: DereferencedSparkApplication, + pub resolved_product_image: ResolvedProductImage, + pub product_config: ValidatedRoleConfigByPropertyKind, +} + +pub fn validate( + dereferenced: DereferencedSparkApplication, + operator_environment: &OperatorEnvironmentOptions, + product_config: &ProductConfigManager, +) -> Result { + let resolved_product_image = dereferenced + .spark_application + .spec + .spark_image + .resolve( + CONTAINER_IMAGE_BASE_NAME, + &operator_environment.image_repository, + crate::built_info::PKG_VERSION, + ) + .context(ResolveProductImageSnafu)?; + + let product_config = dereferenced + .spark_application + .validated_role_config(&resolved_product_image, product_config) + .context(InvalidProductConfigSnafu)?; + + Ok(ValidatedSparkApplication { + dereferenced, + resolved_product_image, + product_config, + }) +} diff --git a/tests/templates/kuttl/smoke/42-assert.yaml b/tests/templates/kuttl/smoke/42-assert.yaml new file mode 100644 index 00000000..707c699b --- /dev/null +++ b/tests/templates/kuttl/smoke/42-assert.yaml @@ -0,0 +1,170 @@ +--- +apiVersion: kuttl.dev/v1beta1 +kind: TestAssert +timeout: 60 +--- +apiVersion: apps/v1 +kind: StatefulSet +metadata: + name: spark-history-node-default + labels: + app.kubernetes.io/component: node + app.kubernetes.io/instance: spark-history + app.kubernetes.io/managed-by: spark.stackable.tech_history + app.kubernetes.io/name: spark-history + app.kubernetes.io/role-group: default + stackable.tech/vendor: Stackable + ownerReferences: + - apiVersion: spark.stackable.tech/v1alpha1 + controller: true + kind: SparkHistoryServer + name: spark-history +spec: + replicas: 1 + selector: + matchLabels: + app.kubernetes.io/component: node + app.kubernetes.io/instance: spark-history + app.kubernetes.io/name: spark-history + app.kubernetes.io/role-group: default +status: + readyReplicas: 1 + replicas: 1 +--- +apiVersion: v1 +kind: ConfigMap +metadata: + name: spark-history-node-default + labels: + app.kubernetes.io/component: node + app.kubernetes.io/instance: spark-history + app.kubernetes.io/managed-by: spark.stackable.tech_history + app.kubernetes.io/name: spark-history + app.kubernetes.io/role-group: default + stackable.tech/vendor: Stackable + ownerReferences: + - apiVersion: spark.stackable.tech/v1alpha1 + controller: true + kind: SparkHistoryServer + name: spark-history +--- +apiVersion: v1 +kind: Service +metadata: + name: spark-history-node-default-metrics + annotations: + prometheus.io/path: /metrics + prometheus.io/port: "18081" + prometheus.io/scheme: http + prometheus.io/scrape: "true" + labels: + app.kubernetes.io/component: node + app.kubernetes.io/instance: spark-history + app.kubernetes.io/managed-by: spark.stackable.tech_history + app.kubernetes.io/name: spark-history + app.kubernetes.io/role-group: default + prometheus.io/scrape: "true" + stackable.tech/vendor: Stackable + ownerReferences: + - apiVersion: spark.stackable.tech/v1alpha1 + controller: true + kind: SparkHistoryServer + name: spark-history +spec: + clusterIP: None + ports: + - name: metrics + port: 18081 + protocol: TCP + targetPort: 18081 + publishNotReadyAddresses: true + selector: + app.kubernetes.io/component: node + app.kubernetes.io/instance: spark-history + app.kubernetes.io/name: spark-history + app.kubernetes.io/role-group: default + type: ClusterIP +--- +apiVersion: listeners.stackable.tech/v1alpha1 +kind: Listener +metadata: + name: spark-history-node + labels: + app.kubernetes.io/component: node + app.kubernetes.io/instance: spark-history + app.kubernetes.io/managed-by: spark.stackable.tech_history + app.kubernetes.io/name: spark-history + app.kubernetes.io/role-group: none + stackable.tech/vendor: Stackable + ownerReferences: + - apiVersion: spark.stackable.tech/v1alpha1 + controller: true + kind: SparkHistoryServer + name: spark-history +spec: + className: cluster-internal + ports: + - name: http + port: 18080 + protocol: TCP +--- +apiVersion: v1 +kind: ServiceAccount +metadata: + name: spark-history-serviceaccount + labels: + app.kubernetes.io/instance: spark-history + app.kubernetes.io/managed-by: spark.stackable.tech_history + app.kubernetes.io/name: spark-history + ownerReferences: + - apiVersion: spark.stackable.tech/v1alpha1 + controller: true + kind: SparkHistoryServer + name: spark-history +--- +apiVersion: rbac.authorization.k8s.io/v1 +kind: RoleBinding +metadata: + name: spark-history-rolebinding + labels: + app.kubernetes.io/instance: spark-history + app.kubernetes.io/managed-by: spark.stackable.tech_history + app.kubernetes.io/name: spark-history + ownerReferences: + - apiVersion: spark.stackable.tech/v1alpha1 + controller: true + kind: SparkHistoryServer + name: spark-history +roleRef: + apiGroup: rbac.authorization.k8s.io + kind: ClusterRole + name: spark-history-clusterrole +subjects: + - kind: ServiceAccount + name: spark-history-serviceaccount +--- +apiVersion: policy/v1 +kind: PodDisruptionBudget +metadata: + name: spark-history-node + labels: + app.kubernetes.io/component: node + app.kubernetes.io/instance: spark-history + app.kubernetes.io/managed-by: spark.stackable.tech_history + app.kubernetes.io/name: spark-history + ownerReferences: + - apiVersion: spark.stackable.tech/v1alpha1 + controller: true + kind: SparkHistoryServer + name: spark-history +spec: + maxUnavailable: 1 + selector: + matchLabels: + app.kubernetes.io/component: node + app.kubernetes.io/instance: spark-history + app.kubernetes.io/name: spark-history +status: + currentHealthy: 1 + disruptionsAllowed: 1 + expectedPods: 1 diff --git a/tests/templates/kuttl/smoke/43-assert.yaml.j2 b/tests/templates/kuttl/smoke/43-assert.yaml.j2 new file mode 100644 index 00000000..8bdacc29 --- /dev/null +++ b/tests/templates/kuttl/smoke/43-assert.yaml.j2 @@ -0,0 +1,40 @@ +--- +# Snapshot the full `.data` of each operator-managed ConfigMap. +# Any code change that alters rendered config values will fail these diffs. +# +# Runs as its own step (after 40/41/42) so kuttl does not re-evaluate the heavy +# heredoc on every 1-second readiness retry of the install step. By this point +# the cluster is in steady state, so each script runs once. +# +# The heredoc is quoted (`<<'YAMLEOF'`) so shell substitution is disabled and +# any property-style escapes survive verbatim. Both sides are normalized to +# canonical JSON via `yq -o=json` before comparison. +apiVersion: kuttl.dev/v1beta1 +kind: TestAssert +timeout: 60 +commands: + - script: | + expected=$(cat <<'YAMLEOF' | yq -o=json + security.properties: "" + spark-defaults.conf: |- +{% if test_scenario['values']['s3-use-tls'] == 'true' %} + spark.hadoop.fs.s3a.endpoint https://eventlog-minio:9000/ +{% else %} + spark.hadoop.fs.s3a.endpoint http://eventlog-minio:9000/ +{% endif %} + spark.hadoop.fs.s3a.endpoint.region us-east-1 + spark.hadoop.fs.s3a.path.style.access true + spark.history.fs.cleaner.enabled true + spark.history.fs.logDirectory s3a://spark-logs/eventlogs/ + spark-env.sh: "" + YAMLEOF + ) + actual=$(kubectl -n $NAMESPACE get cm spark-history-node-default -o yaml | yq -o=json '.data') + if [ "$expected" != "$actual" ]; then + echo "ERROR: ConfigMap spark-history-node-default data drifted from snapshot." + echo "=== expected ===" + printf '%s\n' "$expected" + echo "=== actual ===" + printf '%s\n' "$actual" + exit 1 + fi diff --git a/tests/templates/kuttl/smoke/52-assert.yaml b/tests/templates/kuttl/smoke/52-assert.yaml new file mode 100644 index 00000000..8404522b --- /dev/null +++ b/tests/templates/kuttl/smoke/52-assert.yaml @@ -0,0 +1,96 @@ +--- +apiVersion: kuttl.dev/v1beta1 +kind: TestAssert +timeout: 60 +--- +apiVersion: v1 +kind: ConfigMap +metadata: + name: spark-pi-s3-1-driver-pod-template + labels: + app.kubernetes.io/component: pod-templates + app.kubernetes.io/instance: spark-pi-s3-1 + app.kubernetes.io/managed-by: spark.stackable.tech_sparkapplication + app.kubernetes.io/name: spark-k8s + app.kubernetes.io/role-group: sparkapplication + stackable.tech/vendor: Stackable + ownerReferences: + - apiVersion: spark.stackable.tech/v1alpha1 + controller: true + kind: SparkApplication + name: spark-pi-s3-1 +--- +apiVersion: v1 +kind: ConfigMap +metadata: + name: spark-pi-s3-1-executor-pod-template + labels: + app.kubernetes.io/component: pod-templates + app.kubernetes.io/instance: spark-pi-s3-1 + app.kubernetes.io/managed-by: spark.stackable.tech_sparkapplication + app.kubernetes.io/name: spark-k8s + app.kubernetes.io/role-group: sparkapplication + stackable.tech/vendor: Stackable + ownerReferences: + - apiVersion: spark.stackable.tech/v1alpha1 + controller: true + kind: SparkApplication + name: spark-pi-s3-1 +--- +apiVersion: v1 +kind: ConfigMap +metadata: + name: spark-pi-s3-1-submit-job + labels: + app.kubernetes.io/component: spark-submit + app.kubernetes.io/instance: spark-pi-s3-1 + app.kubernetes.io/managed-by: spark.stackable.tech_sparkapplication + app.kubernetes.io/name: spark-k8s + app.kubernetes.io/role-group: sparkapplication + stackable.tech/vendor: Stackable + ownerReferences: + - apiVersion: spark.stackable.tech/v1alpha1 + controller: true + kind: SparkApplication + name: spark-pi-s3-1 +--- +apiVersion: v1 +kind: ServiceAccount +metadata: + name: spark-pi-s3-1 + labels: + app.kubernetes.io/component: service-account + app.kubernetes.io/instance: spark-pi-s3-1 + app.kubernetes.io/managed-by: spark.stackable.tech_sparkapplication + app.kubernetes.io/name: spark-k8s + app.kubernetes.io/role-group: sparkapplication + stackable.tech/vendor: Stackable + ownerReferences: + - apiVersion: spark.stackable.tech/v1alpha1 + controller: true + kind: SparkApplication + name: spark-pi-s3-1 +--- +apiVersion: rbac.authorization.k8s.io/v1 +kind: RoleBinding +metadata: + name: spark-pi-s3-1 + labels: + app.kubernetes.io/component: role-binding + app.kubernetes.io/instance: spark-pi-s3-1 + app.kubernetes.io/managed-by: spark.stackable.tech_sparkapplication + app.kubernetes.io/name: spark-k8s + app.kubernetes.io/role-group: sparkapplication + stackable.tech/vendor: Stackable + ownerReferences: + - apiVersion: spark.stackable.tech/v1alpha1 + controller: true + kind: SparkApplication + name: spark-pi-s3-1 +roleRef: + apiGroup: rbac.authorization.k8s.io + kind: ClusterRole + name: spark-k8s-clusterrole +subjects: + - kind: ServiceAccount + name: spark-pi-s3-1 diff --git a/tests/templates/kuttl/smoke/53-assert.yaml.j2 b/tests/templates/kuttl/smoke/53-assert.yaml.j2 new file mode 100644 index 00000000..55a96203 --- /dev/null +++ b/tests/templates/kuttl/smoke/53-assert.yaml.j2 @@ -0,0 +1,405 @@ +--- +# Snapshot the full `.data` of each SparkApplication-owned ConfigMap. +# Any code change that alters rendered config values will fail these diffs. +# +# Runs as its own step (after 50/51/52) so kuttl does not re-evaluate the heavy +# heredoc on every 1-second readiness retry of the install step. By this point +# the cluster is in steady state, so each script runs once. +# +# The heredoc is quoted (`<<'YAMLEOF'`) so shell substitution is disabled and +# any property-style escapes survive verbatim. Both sides are normalized to +# canonical JSON via `yq -o=json` before comparison. +apiVersion: kuttl.dev/v1beta1 +kind: TestAssert +timeout: 60 +commands: + - script: | + expected=$(cat <<'YAMLEOF' | yq -o=json + log4j2.properties: |- + appenders = FILE, CONSOLE + + appender.CONSOLE.type = Console + appender.CONSOLE.name = CONSOLE + appender.CONSOLE.target = SYSTEM_ERR + appender.CONSOLE.layout.type = PatternLayout + appender.CONSOLE.layout.pattern = %d{ISO8601} %p [%t] %c - %m%n + appender.CONSOLE.filter.threshold.type = ThresholdFilter + appender.CONSOLE.filter.threshold.level = INFO + + appender.FILE.type = RollingFile + appender.FILE.name = FILE + appender.FILE.fileName = /stackable/log/spark/spark.log4j2.xml + appender.FILE.filePattern = /stackable/log/spark/spark.log4j2.xml.%i + appender.FILE.layout.type = XMLLayout + appender.FILE.policies.type = Policies + appender.FILE.policies.size.type = SizeBasedTriggeringPolicy + appender.FILE.policies.size.size = 5MB + appender.FILE.strategy.type = DefaultRolloverStrategy + appender.FILE.strategy.max = 1 + appender.FILE.filter.threshold.type = ThresholdFilter + appender.FILE.filter.threshold.level = INFO + + + rootLogger.level=INFO + rootLogger.appenderRefs = CONSOLE, FILE + rootLogger.appenderRef.CONSOLE.ref = CONSOLE + rootLogger.appenderRef.FILE.ref = FILE + security.properties: | + networkaddress.cache.negative.ttl=0 + networkaddress.cache.ttl=30 + spark-env.sh: "" + template.yaml: | + metadata: + labels: + app.kubernetes.io/component: spark + app.kubernetes.io/instance: spark-pi-s3-1 + app.kubernetes.io/managed-by: spark.stackable.tech_sparkapplication + app.kubernetes.io/name: spark-k8s + app.kubernetes.io/role-group: sparkapplication + app.kubernetes.io/version: 4.1.1-stackable0.0.0-dev + prometheus.io/scrape: 'true' + stackable.tech/vendor: Stackable + name: spark + spec: + affinity: {} + containers: + - env: + - name: CONTAINERDEBUG_LOG_DIRECTORY + value: /stackable/log/containerdebug +{% if test_scenario['values']['s3-use-tls'] == 'true' %} + - name: STACKABLE_TLS_STORE_PASSWORD + value: changeit +{% endif %} + - name: _STACKABLE_PRE_HOOK + value: containerdebug --output=/stackable/log/containerdebug-state.json --loop & + image: oci.stackable.tech/sdp/spark-k8s:4.1.1-stackable0.0.0-dev + imagePullPolicy: IfNotPresent + name: spark + resources: + limits: + cpu: '1' + memory: 1Gi + requests: + cpu: 250m + memory: 1Gi + volumeMounts: + - mountPath: /stackable/secrets/s3-credentials-class + name: s3-credentials-class + - mountPath: /stackable/secrets/history-credentials-class + name: history-credentials-class + - mountPath: /stackable/log_config + name: log-config + - mountPath: /stackable/log + name: log +{% if test_scenario['values']['s3-use-tls'] == 'true' %} + - mountPath: /stackable/truststore + name: stackable-truststore + - mountPath: /stackable/mount_server_tls/minio-tls-eventlog + name: minio-tls-eventlog +{% endif %} + enableServiceLinks: false +{% if test_scenario['values']['s3-use-tls'] == 'true' %} + initContainers: + - args: + - |- + cert-tools generate-pkcs12-truststore --pem /etc/pki/ca-trust/extracted/pem/tls-ca-bundle.pem --out /stackable/truststore/truststore.p12 --out-password changeit + cert-tools generate-pkcs12-truststore --pkcs12 /stackable/truststore/truststore.p12:changeit --pkcs12 /stackable/mount_server_tls/minio-tls-eventlog/truststore.p12 --out /stackable/truststore/truststore.p12 --out-password changeit + command: + - /bin/bash + - -x + - -euo + - pipefail + - -c + image: oci.stackable.tech/sdp/spark-k8s:4.1.1-stackable0.0.0-dev + name: tls + resources: + limits: + cpu: 1000m + memory: 1024Mi + requests: + cpu: 250m + memory: 1024Mi + volumeMounts: + - mountPath: /stackable/mount_server_tls/minio-tls-eventlog + name: minio-tls-eventlog + - mountPath: /stackable/truststore + name: stackable-truststore +{% endif %} + securityContext: + fsGroup: 1000 + serviceAccountName: spark-pi-s3-1 + volumes: + - ephemeral: + volumeClaimTemplate: + metadata: + annotations: + secrets.stackable.tech/class: history-credentials-class + secrets.stackable.tech/provision-parts: public-private + spec: + accessModes: + - ReadWriteOnce + resources: + requests: + storage: '1' + storageClassName: secrets.stackable.tech + name: history-credentials-class + - emptyDir: + sizeLimit: 39Mi + name: log + - configMap: + name: spark-pi-s3-1-driver-pod-template + name: log-config +{% if test_scenario['values']['s3-use-tls'] == 'true' %} + - ephemeral: + volumeClaimTemplate: + metadata: + annotations: + secrets.stackable.tech/backend.autotls.cert.lifetime: 1d + secrets.stackable.tech/class: minio-tls-eventlog + secrets.stackable.tech/format: tls-pkcs12 + secrets.stackable.tech/provision-parts: public + spec: + accessModes: + - ReadWriteOnce + resources: + requests: + storage: '1' + storageClassName: secrets.stackable.tech + name: minio-tls-eventlog +{% endif %} + - ephemeral: + volumeClaimTemplate: + metadata: + annotations: + secrets.stackable.tech/class: s3-credentials-class + secrets.stackable.tech/provision-parts: public-private + spec: + accessModes: + - ReadWriteOnce + resources: + requests: + storage: '1' + storageClassName: secrets.stackable.tech + name: s3-credentials-class +{% if test_scenario['values']['s3-use-tls'] == 'true' %} + - emptyDir: + sizeLimit: 5Mi + name: stackable-truststore +{% endif %} + - configMap: + name: spark-pi-s3-1-driver-pod-template + name: config + YAMLEOF + ) + actual=$(kubectl -n $NAMESPACE get cm spark-pi-s3-1-driver-pod-template -o yaml | yq -o=json '.data') + if [ "$expected" != "$actual" ]; then + echo "ERROR: ConfigMap spark-pi-s3-1-driver-pod-template data drifted from snapshot." + echo "=== expected ===" + printf '%s\n' "$expected" + echo "=== actual ===" + printf '%s\n' "$actual" + exit 1 + fi + - script: | + expected=$(cat <<'YAMLEOF' | yq -o=json + log4j2.properties: |- + appenders = FILE, CONSOLE + + appender.CONSOLE.type = Console + appender.CONSOLE.name = CONSOLE + appender.CONSOLE.target = SYSTEM_ERR + appender.CONSOLE.layout.type = PatternLayout + appender.CONSOLE.layout.pattern = %d{ISO8601} %p [%t] %c - %m%n + appender.CONSOLE.filter.threshold.type = ThresholdFilter + appender.CONSOLE.filter.threshold.level = INFO + + appender.FILE.type = RollingFile + appender.FILE.name = FILE + appender.FILE.fileName = /stackable/log/spark/spark.log4j2.xml + appender.FILE.filePattern = /stackable/log/spark/spark.log4j2.xml.%i + appender.FILE.layout.type = XMLLayout + appender.FILE.policies.type = Policies + appender.FILE.policies.size.type = SizeBasedTriggeringPolicy + appender.FILE.policies.size.size = 5MB + appender.FILE.strategy.type = DefaultRolloverStrategy + appender.FILE.strategy.max = 1 + appender.FILE.filter.threshold.type = ThresholdFilter + appender.FILE.filter.threshold.level = INFO + + + rootLogger.level=INFO + rootLogger.appenderRefs = CONSOLE, FILE + rootLogger.appenderRef.CONSOLE.ref = CONSOLE + rootLogger.appenderRef.FILE.ref = FILE + security.properties: | + networkaddress.cache.negative.ttl=0 + networkaddress.cache.ttl=30 + spark-env.sh: "" + template.yaml: | + metadata: + labels: + app.kubernetes.io/component: spark + app.kubernetes.io/instance: spark-pi-s3-1 + app.kubernetes.io/managed-by: spark.stackable.tech_sparkapplication + app.kubernetes.io/name: spark-k8s + app.kubernetes.io/role-group: sparkapplication + app.kubernetes.io/version: 4.1.1-stackable0.0.0-dev + stackable.tech/vendor: Stackable + name: spark + spec: + affinity: {} + containers: + - env: + - name: CONTAINERDEBUG_LOG_DIRECTORY + value: /stackable/log/containerdebug +{% if test_scenario['values']['s3-use-tls'] == 'true' %} + - name: STACKABLE_TLS_STORE_PASSWORD + value: changeit +{% endif %} + - name: _STACKABLE_PRE_HOOK + value: containerdebug --output=/stackable/log/containerdebug-state.json --loop & + image: oci.stackable.tech/sdp/spark-k8s:4.1.1-stackable0.0.0-dev + imagePullPolicy: IfNotPresent + name: spark + resources: + limits: + cpu: '1' + memory: 1Gi + requests: + cpu: 250m + memory: 1Gi + volumeMounts: + - mountPath: /stackable/secrets/s3-credentials-class + name: s3-credentials-class + - mountPath: /stackable/secrets/history-credentials-class + name: history-credentials-class + - mountPath: /stackable/log_config + name: log-config + - mountPath: /stackable/log + name: log +{% if test_scenario['values']['s3-use-tls'] == 'true' %} + - mountPath: /stackable/truststore + name: stackable-truststore + - mountPath: /stackable/mount_server_tls/minio-tls-eventlog + name: minio-tls-eventlog +{% endif %} + enableServiceLinks: false +{% if test_scenario['values']['s3-use-tls'] == 'true' %} + initContainers: + - args: + - |- + cert-tools generate-pkcs12-truststore --pem /etc/pki/ca-trust/extracted/pem/tls-ca-bundle.pem --out /stackable/truststore/truststore.p12 --out-password changeit + cert-tools generate-pkcs12-truststore --pkcs12 /stackable/truststore/truststore.p12:changeit --pkcs12 /stackable/mount_server_tls/minio-tls-eventlog/truststore.p12 --out /stackable/truststore/truststore.p12 --out-password changeit + command: + - /bin/bash + - -x + - -euo + - pipefail + - -c + image: oci.stackable.tech/sdp/spark-k8s:4.1.1-stackable0.0.0-dev + name: tls + resources: + limits: + cpu: 1000m + memory: 1024Mi + requests: + cpu: 250m + memory: 1024Mi + volumeMounts: + - mountPath: /stackable/mount_server_tls/minio-tls-eventlog + name: minio-tls-eventlog + - mountPath: /stackable/truststore + name: stackable-truststore +{% endif %} + securityContext: + fsGroup: 1000 + serviceAccountName: spark-pi-s3-1 + volumes: + - ephemeral: + volumeClaimTemplate: + metadata: + annotations: + secrets.stackable.tech/class: history-credentials-class + secrets.stackable.tech/provision-parts: public-private + spec: + accessModes: + - ReadWriteOnce + resources: + requests: + storage: '1' + storageClassName: secrets.stackable.tech + name: history-credentials-class + - emptyDir: + sizeLimit: 39Mi + name: log + - configMap: + name: spark-pi-s3-1-executor-pod-template + name: log-config +{% if test_scenario['values']['s3-use-tls'] == 'true' %} + - ephemeral: + volumeClaimTemplate: + metadata: + annotations: + secrets.stackable.tech/backend.autotls.cert.lifetime: 1d + secrets.stackable.tech/class: minio-tls-eventlog + secrets.stackable.tech/format: tls-pkcs12 + secrets.stackable.tech/provision-parts: public + spec: + accessModes: + - ReadWriteOnce + resources: + requests: + storage: '1' + storageClassName: secrets.stackable.tech + name: minio-tls-eventlog +{% endif %} + - ephemeral: + volumeClaimTemplate: + metadata: + annotations: + secrets.stackable.tech/class: s3-credentials-class + secrets.stackable.tech/provision-parts: public-private + spec: + accessModes: + - ReadWriteOnce + resources: + requests: + storage: '1' + storageClassName: secrets.stackable.tech + name: s3-credentials-class +{% if test_scenario['values']['s3-use-tls'] == 'true' %} + - emptyDir: + sizeLimit: 5Mi + name: stackable-truststore +{% endif %} + - configMap: + name: spark-pi-s3-1-executor-pod-template + name: config + YAMLEOF + ) + actual=$(kubectl -n $NAMESPACE get cm spark-pi-s3-1-executor-pod-template -o yaml | yq -o=json '.data') + if [ "$expected" != "$actual" ]; then + echo "ERROR: ConfigMap spark-pi-s3-1-executor-pod-template data drifted from snapshot." + echo "=== expected ===" + printf '%s\n' "$expected" + echo "=== actual ===" + printf '%s\n' "$actual" + exit 1 + fi + - script: | + expected=$(cat <<'YAMLEOF' | yq -o=json + security.properties: | + networkaddress.cache.negative.ttl=0 + networkaddress.cache.ttl=30 + spark-env.sh: "" + YAMLEOF + ) + actual=$(kubectl -n $NAMESPACE get cm spark-pi-s3-1-submit-job -o yaml | yq -o=json '.data') + if [ "$expected" != "$actual" ]; then + echo "ERROR: ConfigMap spark-pi-s3-1-submit-job data drifted from snapshot." + echo "=== expected ===" + printf '%s\n' "$expected" + echo "=== actual ===" + printf '%s\n' "$actual" + exit 1 + fi diff --git a/tests/templates/kuttl/spark-connect/10-assert.yaml b/tests/templates/kuttl/spark-connect/10-assert.yaml index 87f18f8a..41ae37b6 100644 --- a/tests/templates/kuttl/spark-connect/10-assert.yaml +++ b/tests/templates/kuttl/spark-connect/10-assert.yaml @@ -1,25 +1,6 @@ --- apiVersion: kuttl.dev/v1beta1 kind: TestAssert -timeout: 900 ---- -apiVersion: apps/v1 -kind: StatefulSet -metadata: - name: spark-connect-server -status: - readyReplicas: 1 ---- -apiVersion: v1 -kind: Service -metadata: - name: spark-connect-server -spec: - type: NodePort ---- -apiVersion: v1 -kind: Service -metadata: - name: spark-connect-server-headless -spec: - type: ClusterIP +timeout: 300 +commands: + - script: kubectl -n $NAMESPACE wait --for=condition=Available=true sparkconnectservers.spark.stackable.tech/spark-connect --timeout=301s diff --git a/tests/templates/kuttl/spark-connect/12-assert.yaml b/tests/templates/kuttl/spark-connect/12-assert.yaml new file mode 100644 index 00000000..d25e9b1b --- /dev/null +++ b/tests/templates/kuttl/spark-connect/12-assert.yaml @@ -0,0 +1,197 @@ +--- +apiVersion: kuttl.dev/v1beta1 +kind: TestAssert +timeout: 60 +--- +apiVersion: apps/v1 +kind: StatefulSet +metadata: + name: spark-connect-server + labels: + app.kubernetes.io/component: server + app.kubernetes.io/instance: spark-connect + app.kubernetes.io/managed-by: spark.stackable.tech_connect + app.kubernetes.io/name: spark-connect + app.kubernetes.io/role-group: default + stackable.tech/vendor: Stackable + ownerReferences: + - apiVersion: spark.stackable.tech/v1alpha1 + controller: true + kind: SparkConnectServer + name: spark-connect +spec: + replicas: 1 + selector: + matchLabels: + app.kubernetes.io/component: server + app.kubernetes.io/instance: spark-connect + app.kubernetes.io/name: spark-connect + app.kubernetes.io/role-group: default +status: + readyReplicas: 1 + replicas: 1 +--- +apiVersion: v1 +kind: ConfigMap +metadata: + name: spark-connect-server + labels: + app.kubernetes.io/component: server + app.kubernetes.io/instance: spark-connect + app.kubernetes.io/managed-by: spark.stackable.tech_connect + app.kubernetes.io/name: spark-connect + app.kubernetes.io/role-group: default + stackable.tech/vendor: Stackable + ownerReferences: + - apiVersion: spark.stackable.tech/v1alpha1 + controller: true + kind: SparkConnectServer + name: spark-connect +--- +apiVersion: v1 +kind: ConfigMap +metadata: + name: spark-connect-executor + labels: + app.kubernetes.io/component: executor + app.kubernetes.io/instance: spark-connect + app.kubernetes.io/managed-by: spark.stackable.tech_connect + app.kubernetes.io/name: spark-connect + app.kubernetes.io/role-group: default + stackable.tech/vendor: Stackable + ownerReferences: + - apiVersion: spark.stackable.tech/v1alpha1 + controller: true + kind: SparkConnectServer + name: spark-connect +--- +apiVersion: v1 +kind: Service +metadata: + name: spark-connect-server-headless + labels: + app.kubernetes.io/component: server + app.kubernetes.io/instance: spark-connect + app.kubernetes.io/managed-by: spark.stackable.tech_connect + app.kubernetes.io/name: spark-connect + app.kubernetes.io/role-group: default + stackable.tech/vendor: Stackable + ownerReferences: + - apiVersion: spark.stackable.tech/v1alpha1 + controller: true + kind: SparkConnectServer + name: spark-connect +spec: + clusterIP: None + ports: + - name: grpc + port: 15002 + protocol: TCP + targetPort: 15002 + - name: http + port: 4040 + protocol: TCP + targetPort: 4040 + publishNotReadyAddresses: true + selector: + app.kubernetes.io/component: server + app.kubernetes.io/instance: spark-connect + app.kubernetes.io/name: spark-connect + type: ClusterIP +--- +apiVersion: v1 +kind: Service +metadata: + name: spark-connect-server-metrics + annotations: + prometheus.io/path: /metrics/prometheus + prometheus.io/port: "4040" + prometheus.io/scheme: http + prometheus.io/scrape: "true" + labels: + app.kubernetes.io/component: server + app.kubernetes.io/instance: spark-connect + app.kubernetes.io/managed-by: spark.stackable.tech_connect + app.kubernetes.io/name: spark-connect + app.kubernetes.io/role-group: default + prometheus.io/scrape: "true" + stackable.tech/vendor: Stackable + ownerReferences: + - apiVersion: spark.stackable.tech/v1alpha1 + controller: true + kind: SparkConnectServer + name: spark-connect +spec: + clusterIP: None + ports: + - name: metrics + port: 4040 + protocol: TCP + targetPort: 4040 + publishNotReadyAddresses: true + selector: + app.kubernetes.io/component: server + app.kubernetes.io/instance: spark-connect + app.kubernetes.io/name: spark-connect + type: ClusterIP +--- +apiVersion: listeners.stackable.tech/v1alpha1 +kind: Listener +metadata: + name: spark-connect-server + labels: + app.kubernetes.io/component: server + app.kubernetes.io/instance: spark-connect + app.kubernetes.io/managed-by: spark.stackable.tech_connect + app.kubernetes.io/name: spark-connect + app.kubernetes.io/role-group: default + stackable.tech/vendor: Stackable + ownerReferences: + - apiVersion: spark.stackable.tech/v1alpha1 + controller: true + kind: SparkConnectServer + name: spark-connect +spec: + className: external-unstable + ports: + - name: grpc + port: 15002 + protocol: TCP + - name: http + port: 4040 + protocol: TCP +--- +apiVersion: v1 +kind: ServiceAccount +metadata: + name: spark-connect-serviceaccount + labels: + app.kubernetes.io/instance: spark-connect + app.kubernetes.io/managed-by: spark.stackable.tech_connect + app.kubernetes.io/name: spark-connect + ownerReferences: + - apiVersion: spark.stackable.tech/v1alpha1 + controller: true + kind: SparkConnectServer + name: spark-connect +--- +apiVersion: rbac.authorization.k8s.io/v1 +kind: RoleBinding +metadata: + name: spark-connect-rolebinding + labels: + app.kubernetes.io/instance: spark-connect + app.kubernetes.io/managed-by: spark.stackable.tech_connect + app.kubernetes.io/name: spark-connect + ownerReferences: + - apiVersion: spark.stackable.tech/v1alpha1 + controller: true + kind: SparkConnectServer + name: spark-connect +roleRef: + apiGroup: rbac.authorization.k8s.io + kind: ClusterRole + name: spark-connect-clusterrole +subjects: + - kind: ServiceAccount + name: spark-connect-serviceaccount diff --git a/tests/templates/kuttl/spark-connect/13-assert.yaml.j2 b/tests/templates/kuttl/spark-connect/13-assert.yaml.j2 new file mode 100644 index 00000000..cd7dae3e --- /dev/null +++ b/tests/templates/kuttl/spark-connect/13-assert.yaml.j2 @@ -0,0 +1,213 @@ +--- +# Snapshot the full `.data` of each operator-managed ConfigMap. +# Any code change that alters rendered config values will fail these diffs. +# +# Runs as its own step (after 10/11/12) so kuttl does not re-evaluate the heavy +# heredoc on every 1-second readiness retry of the install step. +# +# The heredoc is quoted (`<<'YAMLEOF'`) so shell substitution is disabled and +# property-style escapes (`\:`, `\=`) survive verbatim. Only `__NAMESPACE__` is +# substituted afterwards via `sed`, because kuttl tests run in a randomized +# namespace per invocation. Both sides are normalized to canonical JSON via +# `yq -o=json` before comparison. +apiVersion: kuttl.dev/v1beta1 +kind: TestAssert +timeout: 60 +commands: + - script: | + expected=$(cat <<'YAMLEOF' | sed "s|__NAMESPACE__|$NAMESPACE|g" | yq -o=json + metrics.properties: | + *.sink.prometheusServlet.class=org.apache.spark.metrics.sink.PrometheusServlet + *.sink.prometheusServlet.path=/metrics/prometheus + security.properties: | + networkaddress.cache.negative.ttl=0 + networkaddress.cache.ttl=30 + spark-defaults.conf: | + spark.driver.defaultJavaOptions=-Djava.security.properties\=/stackable/spark/conf/security.properties\ -Dlog4j.configurationFile\=/stackable/log_config/log4j2.properties\ -Dmy.custom.jvm.arg\=customValue + spark.driver.extraClassPath=/stackable/spark/extra-jars/*\:/stackable/spark/connect/spark-connect-4.1.1.jar +{% if test_scenario['values']['s3-use-tls'] == 'true' %} + spark.driver.extraJavaOptions=-Djavax.net.ssl.trustStore\=/stackable/truststore/truststore.p12\ -Djavax.net.ssl.trustStorePassword\=changeit\ -Djavax.net.ssl.trustStoreType\=pkcs12 +{% endif %} + spark.driver.host=spark-connect-server-headless + spark.executor.defaultJavaOptions=-Djava.security.properties\=/stackable/spark/conf/security.properties\ -Dlog4j.configurationFile\=/stackable/log_config/log4j2.properties +{% if test_scenario['values']['s3-use-tls'] == 'true' %} + spark.executor.extraJavaOptions=-Djavax.net.ssl.trustStore\=/stackable/truststore/truststore.p12\ -Djavax.net.ssl.trustStorePassword\=changeit\ -Djavax.net.ssl.trustStoreType\=pkcs12 +{% endif %} + spark.executor.instances=1 + spark.executor.memory=1024M + spark.executor.memoryOverhead=1m + spark.hadoop.fs.s3a.access.key=${file\:UTF-8\:/stackable/secrets/minio-credentials-class/accessKey} + spark.hadoop.fs.s3a.aws.credentials.provider=org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider + spark.hadoop.fs.s3a.bucket.ingest-bucket.access.key=${file\:UTF-8\:/stackable/secrets/minio-credentials-class/accessKey} + spark.hadoop.fs.s3a.bucket.ingest-bucket.aws.credentials.provider=org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider +{% if test_scenario['values']['s3-use-tls'] == 'true' %} + spark.hadoop.fs.s3a.bucket.ingest-bucket.endpoint=https\://minio\:9000/ +{% else %} + spark.hadoop.fs.s3a.bucket.ingest-bucket.endpoint=http\://minio\:9000/ +{% endif %} + spark.hadoop.fs.s3a.bucket.ingest-bucket.endpoint.region=us-east-1 + spark.hadoop.fs.s3a.bucket.ingest-bucket.path.style.access=true + spark.hadoop.fs.s3a.bucket.ingest-bucket.secret.key=${file\:UTF-8\:/stackable/secrets/minio-credentials-class/secretKey} +{% if test_scenario['values']['s3-use-tls'] == 'true' %} + spark.hadoop.fs.s3a.endpoint=https\://minio\:9000/ +{% else %} + spark.hadoop.fs.s3a.endpoint=http\://minio\:9000/ +{% endif %} + spark.hadoop.fs.s3a.endpoint.region=us-east-1 + spark.hadoop.fs.s3a.path.style.access=true + spark.hadoop.fs.s3a.secret.key=${file\:UTF-8\:/stackable/secrets/minio-credentials-class/secretKey} + spark.jars.ivy=/tmp/ivy2 + spark.kubernetes.authenticate.driver.serviceAccountName=spark-connect-serviceaccount + spark.kubernetes.driver.container.image=oci.stackable.tech/sdp/spark-k8s\:4.1.1-stackable0.0.0-dev + spark.kubernetes.driver.pod.name=${env\:HOSTNAME} + spark.kubernetes.executor.container.image=oci.stackable.tech/sdp/spark-k8s\:4.1.1-stackable0.0.0-dev + spark.kubernetes.executor.limit.cores=1 + spark.kubernetes.executor.podTemplateContainerName=spark + spark.kubernetes.executor.podTemplateFile=/stackable/spark/conf/template.yaml + spark.kubernetes.executor.request.cores=1 + spark.kubernetes.namespace=__NAMESPACE__ + spark.metrics.conf=/stackable/spark/conf/metrics.properties + spark.ui.prometheus.enabled=true + template.yaml: | + metadata: + labels: + app.kubernetes.io/component: executor + app.kubernetes.io/instance: spark-connect + app.kubernetes.io/managed-by: spark.stackable.tech_connect + app.kubernetes.io/name: spark-connect + app.kubernetes.io/role-group: default + app.kubernetes.io/version: 4.1.1-stackable0.0.0-dev + stackable.tech/vendor: Stackable + spec: + affinity: + podAntiAffinity: + preferredDuringSchedulingIgnoredDuringExecution: + - podAffinityTerm: + labelSelector: + matchLabels: + app.kubernetes.io/component: executor + app.kubernetes.io/instance: spark-connect + app.kubernetes.io/name: spark-connect + topologyKey: kubernetes.io/hostname + weight: 70 + containers: + - env: + - name: CONTAINERDEBUG_LOG_DIRECTORY + value: /stackable/log/containerdebug + name: spark + volumeMounts: + - mountPath: /stackable/spark/conf + name: config + - mountPath: /stackable/log + name: log + - mountPath: /stackable/secrets/minio-credentials-class + name: minio-credentials-class-s3-credentials +{% if test_scenario['values']['s3-use-tls'] == 'true' %} + - mountPath: /stackable/secrets/minio-tls-ca + name: minio-tls-ca-ca-cert +{% endif %} + - mountPath: /stackable/truststore + name: stackable-truststore + - mountPath: /stackable/log_config + name: log-config + enableServiceLinks: false +{% if test_scenario['values']['s3-use-tls'] == 'true' %} + initContainers: + - command: + - /bin/bash + - -x + - -euo + - pipefail + - -c + - cert-tools generate-pkcs12-truststore --pem /etc/pki/ca-trust/extracted/pem/tls-ca-bundle.pem --out /stackable/truststore/truststore.p12 --out-password changeit && cert-tools generate-pkcs12-truststore --out /stackable/truststore/truststore.p12 --out-password changeit --pkcs12 /stackable/truststore/truststore.p12:changeit --pem /stackable/secrets/minio-tls-ca/ca.crt + image: oci.stackable.tech/sdp/spark-k8s:4.1.1-stackable0.0.0-dev + name: tls-truststore-init + resources: + limits: + cpu: 10m + memory: 128Mi + requests: + cpu: 10m + memory: 128Mi + volumeMounts: + - mountPath: /stackable/secrets/minio-credentials-class + name: minio-credentials-class-s3-credentials + - mountPath: /stackable/secrets/minio-tls-ca + name: minio-tls-ca-ca-cert + - mountPath: /stackable/truststore + name: stackable-truststore +{% endif %} + securityContext: + fsGroup: 1000 + volumes: + - emptyDir: + sizeLimit: 30Mi + name: log + - configMap: + name: spark-connect-executor + name: config + - ephemeral: + volumeClaimTemplate: + metadata: + annotations: + secrets.stackable.tech/class: minio-credentials-class + secrets.stackable.tech/provision-parts: public-private + spec: + accessModes: + - ReadWriteOnce + resources: + requests: + storage: '1' + storageClassName: secrets.stackable.tech + name: minio-credentials-class-s3-credentials +{% if test_scenario['values']['s3-use-tls'] == 'true' %} + - ephemeral: + volumeClaimTemplate: + metadata: + annotations: + secrets.stackable.tech/class: minio-tls-ca + secrets.stackable.tech/provision-parts: public + spec: + accessModes: + - ReadWriteOnce + resources: + requests: + storage: '1' + storageClassName: secrets.stackable.tech + name: minio-tls-ca-ca-cert +{% endif %} + - emptyDir: {} + name: stackable-truststore + - configMap: + name: spark-connect-log-config + name: log-config + YAMLEOF + ) + actual=$(kubectl -n $NAMESPACE get cm spark-connect-server -o yaml | yq -o=json '.data') + if [ "$expected" != "$actual" ]; then + echo "ERROR: ConfigMap spark-connect-server data drifted from snapshot." + echo "=== expected ===" + printf '%s\n' "$expected" + echo "=== actual ===" + printf '%s\n' "$actual" + exit 1 + fi + - script: | + expected=$(cat <<'YAMLEOF' | yq -o=json + metrics.properties: | + *.sink.prometheusServlet.class=org.apache.spark.metrics.sink.PrometheusServlet + *.sink.prometheusServlet.path=/metrics/prometheus + security.properties: | + networkaddress.cache.negative.ttl=0 + networkaddress.cache.ttl=30 + YAMLEOF + ) + actual=$(kubectl -n $NAMESPACE get cm spark-connect-executor -o yaml | yq -o=json '.data') + if [ "$expected" != "$actual" ]; then + echo "ERROR: ConfigMap spark-connect-executor data drifted from snapshot." + echo "=== expected ===" + printf '%s\n' "$expected" + echo "=== actual ===" + printf '%s\n' "$actual" + exit 1 + fi