From 241573c12d1ff76f4feb57cf53d8122c7ef68bdc Mon Sep 17 00:00:00 2001 From: Malte Sander Date: Tue, 19 May 2026 16:03:59 +0200 Subject: [PATCH 1/5] refactor: rename kafka_controller.rs -> controller.rs --- .../src/{kafka_controller.rs => controller.rs} | 0 rust/operator-binary/src/discovery.rs | 2 +- rust/operator-binary/src/main.rs | 10 +++++----- rust/operator-binary/src/operations/pdb.rs | 2 +- rust/operator-binary/src/resource/configmap.rs | 2 +- rust/operator-binary/src/resource/listener.rs | 2 +- rust/operator-binary/src/resource/service.rs | 2 +- rust/operator-binary/src/resource/statefulset.rs | 2 +- 8 files changed, 11 insertions(+), 11 deletions(-) rename rust/operator-binary/src/{kafka_controller.rs => controller.rs} (100%) diff --git a/rust/operator-binary/src/kafka_controller.rs b/rust/operator-binary/src/controller.rs similarity index 100% rename from rust/operator-binary/src/kafka_controller.rs rename to rust/operator-binary/src/controller.rs diff --git a/rust/operator-binary/src/discovery.rs b/rust/operator-binary/src/discovery.rs index e1603574..e0cc6b36 100644 --- a/rust/operator-binary/src/discovery.rs +++ b/rust/operator-binary/src/discovery.rs @@ -10,8 +10,8 @@ use stackable_operator::{ }; use crate::{ + controller::KAFKA_CONTROLLER_NAME, crd::{role::KafkaRole, security::KafkaTlsSecurity, v1alpha1}, - kafka_controller::KAFKA_CONTROLLER_NAME, utils::build_recommended_labels, }; diff --git a/rust/operator-binary/src/main.rs b/rust/operator-binary/src/main.rs index cd75209b..c074f8af 100644 --- a/rust/operator-binary/src/main.rs +++ b/rust/operator-binary/src/main.rs @@ -35,15 +35,15 @@ use stackable_operator::{ }; use crate::{ + controller::KAFKA_FULL_CONTROLLER_NAME, crd::{KafkaCluster, KafkaClusterVersion, OPERATOR_NAME, v1alpha1}, - kafka_controller::KAFKA_FULL_CONTROLLER_NAME, webhooks::conversion::create_webhook_server, }; mod config; +mod controller; mod crd; mod discovery; -mod kafka_controller; mod kerberos; mod operations; mod product_logging; @@ -183,9 +183,9 @@ async fn main() -> anyhow::Result<()> { ) .graceful_shutdown_on(sigterm_watcher.handle()) .run( - kafka_controller::reconcile_kafka, - kafka_controller::error_policy, - Arc::new(kafka_controller::Ctx { + controller::reconcile_kafka, + controller::error_policy, + Arc::new(controller::Ctx { client: client.clone(), operator_environment, product_config, diff --git a/rust/operator-binary/src/operations/pdb.rs b/rust/operator-binary/src/operations/pdb.rs index 18f46dc7..e42888ca 100644 --- a/rust/operator-binary/src/operations/pdb.rs +++ b/rust/operator-binary/src/operations/pdb.rs @@ -5,8 +5,8 @@ use stackable_operator::{ }; use crate::{ + controller::KAFKA_CONTROLLER_NAME, crd::{APP_NAME, OPERATOR_NAME, role::KafkaRole, v1alpha1}, - kafka_controller::KAFKA_CONTROLLER_NAME, }; #[derive(Snafu, Debug)] diff --git a/rust/operator-binary/src/resource/configmap.rs b/rust/operator-binary/src/resource/configmap.rs index 1c0f1386..473120b3 100644 --- a/rust/operator-binary/src/resource/configmap.rs +++ b/rust/operator-binary/src/resource/configmap.rs @@ -14,6 +14,7 @@ use stackable_operator::{ }; use crate::{ + controller::KAFKA_CONTROLLER_NAME, crd::{ JVM_SECURITY_PROPERTIES_FILE, KafkaPodDescriptor, MetadataManager, STACKABLE_LISTENER_BOOTSTRAP_DIR, STACKABLE_LISTENER_BROKER_DIR, @@ -26,7 +27,6 @@ use crate::{ security::KafkaTlsSecurity, v1alpha1, }, - kafka_controller::KAFKA_CONTROLLER_NAME, operations::graceful_shutdown::graceful_shutdown_config_properties, product_logging::extend_role_group_config_map, utils::build_recommended_labels, diff --git a/rust/operator-binary/src/resource/listener.rs b/rust/operator-binary/src/resource/listener.rs index 0caac124..4afde134 100644 --- a/rust/operator-binary/src/resource/listener.rs +++ b/rust/operator-binary/src/resource/listener.rs @@ -5,8 +5,8 @@ use stackable_operator::{ }; use crate::{ + controller::KAFKA_CONTROLLER_NAME, crd::{role::broker::BrokerConfig, security::KafkaTlsSecurity, v1alpha1}, - kafka_controller::KAFKA_CONTROLLER_NAME, utils::build_recommended_labels, }; diff --git a/rust/operator-binary/src/resource/service.rs b/rust/operator-binary/src/resource/service.rs index 69c33584..f430fc8e 100644 --- a/rust/operator-binary/src/resource/service.rs +++ b/rust/operator-binary/src/resource/service.rs @@ -8,8 +8,8 @@ use stackable_operator::{ }; use crate::{ + controller::KAFKA_CONTROLLER_NAME, crd::{APP_NAME, METRICS_PORT, METRICS_PORT_NAME, security::KafkaTlsSecurity, v1alpha1}, - kafka_controller::KAFKA_CONTROLLER_NAME, utils::build_recommended_labels, }; diff --git a/rust/operator-binary/src/resource/statefulset.rs b/rust/operator-binary/src/resource/statefulset.rs index 9725a8fd..5cb262f0 100644 --- a/rust/operator-binary/src/resource/statefulset.rs +++ b/rust/operator-binary/src/resource/statefulset.rs @@ -48,6 +48,7 @@ use crate::{ command::{broker_kafka_container_commands, controller_kafka_container_command}, node_id_hasher::node_id_hash32_offset, }, + controller::KAFKA_CONTROLLER_NAME, crd::{ self, APP_NAME, KAFKA_HEAP_OPTS, LISTENER_BOOTSTRAP_VOLUME_NAME, LISTENER_BROKER_VOLUME_NAME, LOG_DIRS_VOLUME_NAME, METRICS_PORT, METRICS_PORT_NAME, @@ -60,7 +61,6 @@ use crate::{ security::KafkaTlsSecurity, v1alpha1, }, - kafka_controller::KAFKA_CONTROLLER_NAME, kerberos::add_kerberos_pod_config, operations::graceful_shutdown::add_graceful_shutdown_config, product_logging::{ From 3f0502f719b2e11b437a5f54fe943ae320014422 Mon Sep 17 00:00:00 2001 From: Malte Sander Date: Tue, 19 May 2026 16:14:10 +0200 Subject: [PATCH 2/5] refactor: add dereference and validate steps --- rust/operator-binary/src/controller.rs | 213 ++++-------------- .../src/controller/dereference.rs | 63 ++++++ .../src/controller/validate.rs | 182 +++++++++++++++ .../operator-binary/src/crd/authentication.rs | 13 +- rust/operator-binary/src/crd/security.rs | 29 +-- 5 files changed, 301 insertions(+), 199 deletions(-) create mode 100644 rust/operator-binary/src/controller/dereference.rs create mode 100644 rust/operator-binary/src/controller/validate.rs diff --git a/rust/operator-binary/src/controller.rs b/rust/operator-binary/src/controller.rs index c62cf520..5714ef9b 100644 --- a/rust/operator-binary/src/controller.rs +++ b/rust/operator-binary/src/controller.rs @@ -1,17 +1,14 @@ //! Ensures that `Pod`s are configured and running for each [`v1alpha1::KafkaCluster`]. -use std::{collections::HashMap, str::FromStr, sync::Arc}; +use std::{str::FromStr, sync::Arc}; use const_format::concatcp; -use product_config::{ProductConfigManager, types::PropertyNameKind}; +use product_config::ProductConfigManager; use snafu::{ResultExt, Snafu}; use stackable_operator::{ cli::OperatorEnvironmentOptions, cluster_resources::{ClusterResourceApplyStrategy, ClusterResources}, - commons::{ - product_image_selection::{self}, - rbac::build_rbac_resources, - }, + commons::rbac::build_rbac_resources, crd::listener, kube::{ Resource, @@ -20,10 +17,6 @@ use stackable_operator::{ runtime::{controller::Action, reflector::ObjectRef}, }, logging::controller::ReconcilerError, - product_config_utils::{ - ValidatedRoleConfigByPropertyKind, transform_all_roles_to_config, - validate_all_roles_and_groups_config, - }, role_utils::{GenericRoleConfig, RoleGroupRef}, shared::time::Duration, status::condition::{ @@ -33,16 +26,14 @@ use stackable_operator::{ }; use strum::{EnumDiscriminants, IntoStaticStr}; +mod dereference; +mod validate; + use crate::{ crd::{ - self, APP_NAME, CONTAINER_IMAGE_BASE_NAME, JVM_SECURITY_PROPERTIES_FILE, - KafkaClusterStatus, OPERATOR_NAME, authorization, + self, APP_NAME, KafkaClusterStatus, OPERATOR_NAME, listener::get_kafka_listener_config, - role::{ - AnyConfig, KafkaRole, broker::BROKER_PROPERTIES_FILE, - controller::CONTROLLER_PROPERTIES_FILE, - }, - security::KafkaTlsSecurity, + role::{AnyConfig, KafkaRole}, v1alpha1, }, discovery::{self, build_discovery_configmap}, @@ -68,6 +59,12 @@ pub struct Ctx { #[strum_discriminants(derive(IntoStaticStr))] #[allow(clippy::enum_variant_names)] pub enum Error { + #[snafu(display("failed to dereference resources"))] + Dereference { source: dereference::Error }, + + #[snafu(display("failed to validate cluster"))] + ValidateCluster { source: validate::Error }, + #[snafu(display("failed to build pod descriptors"))] BuildPodDescriptors { source: crate::crd::Error }, @@ -76,12 +73,6 @@ pub enum Error { source: crate::crd::listener::KafkaListenerError, }, - #[snafu(display("cluster object defines no '{role}' role"))] - MissingKafkaRole { - source: crate::crd::Error, - role: KafkaRole, - }, - #[snafu(display("failed to apply role Service"))] ApplyRoleService { source: stackable_operator::cluster_resources::Error, @@ -105,16 +96,6 @@ pub enum Error { rolegroup: RoleGroupRef, }, - #[snafu(display("failed to generate product config"))] - GenerateProductConfig { - source: stackable_operator::product_config_utils::Error, - }, - - #[snafu(display("invalid product config"))] - InvalidProductConfig { - source: stackable_operator::product_config_utils::Error, - }, - #[snafu(display("failed to build discovery ConfigMap"))] BuildDiscoveryConfig { source: discovery::Error }, @@ -128,9 +109,6 @@ pub enum Error { source: stackable_operator::cluster_resources::Error, }, - #[snafu(display("failed to initialize security context"))] - FailedToInitializeSecurityContext { source: crate::crd::security::Error }, - #[snafu(display("failed to create cluster resources"))] CreateClusterResources { source: stackable_operator::cluster_resources::Error, @@ -170,9 +148,6 @@ pub enum Error { stackable_operator::kvp::KeyValuePairError, }, - #[snafu(display("failed to validate authentication method"))] - FailedToValidateAuthenticationMethod { source: crate::crd::security::Error }, - #[snafu(display("KafkaCluster object is invalid"))] InvalidKafkaCluster { source: error_boundary::InvalidObject, @@ -181,11 +156,6 @@ pub enum Error { #[snafu(display("KafkaCluster object is misconfigured"))] MisconfiguredKafkaCluster { source: crd::Error }, - #[snafu(display("failed to resolve product image"))] - ResolveProductImage { - source: product_image_selection::Error, - }, - #[snafu(display("failed to parse role: {source}"))] ParseRole { source: strum::ParseError }, @@ -208,9 +178,6 @@ pub enum Error { BuildListener { source: crate::resource::listener::Error, }, - - #[snafu(display("object defines no namespace"))] - GetOpaConfig { source: authorization::Error }, } type Result = std::result::Result; @@ -221,17 +188,15 @@ impl ReconcilerError for Error { fn secondary_object(&self) -> Option> { match self { - Error::MissingKafkaRole { .. } => None, + Error::Dereference { .. } => None, + Error::ValidateCluster { .. } => None, Error::ApplyRoleService { .. } => None, Error::ApplyRoleGroupService { .. } => None, Error::ApplyRoleGroupConfig { .. } => None, Error::ApplyRoleGroupStatefulSet { .. } => None, - Error::GenerateProductConfig { .. } => None, - Error::InvalidProductConfig { .. } => None, Error::BuildDiscoveryConfig { .. } => None, Error::ApplyDiscoveryConfig { .. } => None, Error::DeleteOrphans { .. } => None, - Error::FailedToInitializeSecurityContext { .. } => None, Error::CreateClusterResources { .. } => None, Error::FailedToResolveConfig { .. } => None, Error::ApplyServiceAccount { .. } => None, @@ -240,10 +205,8 @@ impl ReconcilerError for Error { Error::BuildRbacResources { .. } => None, Error::FailedToCreatePdb { .. } => None, Error::GetRequiredLabels { .. } => None, - Error::FailedToValidateAuthenticationMethod { .. } => None, Error::InvalidKafkaCluster { .. } => None, Error::MisconfiguredKafkaCluster { .. } => None, - Error::ResolveProductImage { .. } => None, Error::ParseRole { .. } => None, Error::BuildStatefulset { .. } => None, Error::BuildConfigMap { .. } => None, @@ -251,7 +214,6 @@ impl ReconcilerError for Error { Error::BuildListener { .. } => None, Error::InvalidKafkaListeners { .. } => None, Error::BuildPodDescriptors { .. } => None, - Error::GetOpaConfig { .. } => None, } } } @@ -270,15 +232,28 @@ pub async fn reconcile_kafka( let client = &ctx.client; - let resolved_product_image = kafka - .spec - .image - .resolve( - CONTAINER_IMAGE_BASE_NAME, - &ctx.operator_environment.image_repository, - crate::built_info::PKG_VERSION, - ) - .context(ResolveProductImageSnafu)?; + // dereference (client required) + let dereferenced_objects = dereference::dereference(client, kafka) + .await + .context(DereferenceSnafu)?; + + // validate (no client required) + let validate::ValidatedInputs { + resolved_product_image, + kafka_security, + validated_role_config: validated_config, + } = validate::validate( + kafka, + &dereferenced_objects, + &ctx.operator_environment, + &ctx.product_config, + ) + .context(ValidateClusterSnafu)?; + + let opa_connect = dereferenced_objects + .resolved_authorization_config + .as_ref() + .map(|auth_config| auth_config.opa_connect.clone()); let mut cluster_resources = ClusterResources::new( APP_NAME, @@ -290,37 +265,6 @@ pub async fn reconcile_kafka( ) .context(CreateClusterResourcesSnafu)?; - let validated_config = validated_product_config( - kafka, - &resolved_product_image.product_version, - &ctx.product_config, - )?; - - // Assemble the OPA connection string from the discovery and the given path if provided - // Will be passed as --override parameter in the cli in the stateful set - let opa_config = &kafka - .spec - .cluster_config - .authorization - .clone() - .get_opa_config(client, kafka) - .await - .context(GetOpaConfigSnafu)?; - - let opa_connect = opa_config - .as_ref() - .map(|auth_config| auth_config.opa_connect.clone()); - - let opa_secret_class = if let Some(opa_config) = opa_config.as_ref() { - opa_config.secret_class.to_owned() - } else { - None - }; - - let kafka_security = KafkaTlsSecurity::new_from_kafka_cluster(client, kafka, opa_secret_class) - .await - .context(FailedToInitializeSecurityContextSnafu)?; - tracing::debug!( kerberos_enabled = kafka_security.has_kerberos_enabled(), kerberos_secret_class = ?kafka_security.kerberos_secret_class(), @@ -329,10 +273,6 @@ pub async fn reconcile_kafka( "The following security settings are used" ); - kafka_security - .validate_authentication_methods() - .context(FailedToValidateAuthenticationMethodSnafu)?; - let mut ss_cond_builder = StatefulSetConditionBuilder::default(); let (rbac_sa, rbac_rolebinding) = build_rbac_resources( @@ -537,80 +477,3 @@ pub fn error_policy( _ => Action::requeue(*Duration::from_secs(5)), } } - -/// Defines all required roles and their required configuration. -/// -/// The roles and their configs are then validated and complemented by the product config. -/// -/// # Arguments -/// * `resource` - The TrinoCluster containing the role definitions. -/// * `version` - The TrinoCluster version. -/// * `product_config` - The product config to validate and complement the user config. -/// -fn validated_product_config( - kafka: &v1alpha1::KafkaCluster, - product_version: &str, - product_config: &ProductConfigManager, -) -> Result { - let mut role_config = HashMap::new(); - - let broker_role = [( - KafkaRole::Broker.to_string(), - ( - vec![ - PropertyNameKind::File(BROKER_PROPERTIES_FILE.to_string()), - PropertyNameKind::File(JVM_SECURITY_PROPERTIES_FILE.to_string()), - PropertyNameKind::Env, - ], - kafka - .broker_role() - .cloned() - .context(MissingKafkaRoleSnafu { - role: KafkaRole::Broker, - })? - .erase(), - ), - )] - .into(); - - let broker_role_config = - transform_all_roles_to_config(kafka, &broker_role).context(GenerateProductConfigSnafu)?; - - role_config.extend(broker_role_config); - - // TODO: need this if because controller_role() raises an error - if kafka.spec.controllers.is_some() { - let controller_role = [( - KafkaRole::Controller.to_string(), - ( - vec![ - PropertyNameKind::File(CONTROLLER_PROPERTIES_FILE.to_string()), - PropertyNameKind::File(JVM_SECURITY_PROPERTIES_FILE.to_string()), - PropertyNameKind::Env, - ], - kafka - .controller_role() - .cloned() - .context(MissingKafkaRoleSnafu { - role: KafkaRole::Controller, - })? - .erase(), - ), - )] - .into(); - - let controller_role_config = transform_all_roles_to_config(kafka, &controller_role) - .context(GenerateProductConfigSnafu)?; - - role_config.extend(controller_role_config); - } - - validate_all_roles_and_groups_config( - product_version, - &role_config, - product_config, - false, - false, - ) - .context(InvalidProductConfigSnafu) -} diff --git a/rust/operator-binary/src/controller/dereference.rs b/rust/operator-binary/src/controller/dereference.rs new file mode 100644 index 00000000..645e4270 --- /dev/null +++ b/rust/operator-binary/src/controller/dereference.rs @@ -0,0 +1,63 @@ +//! The dereference step in the KafkaCluster controller. +//! +//! Fetches all Kubernetes objects referenced by the [`v1alpha1::KafkaCluster`] spec and returns +//! them in [`DereferencedObjects`]. This step only performs I/O; validation of the fetched +//! objects (constraints on which auth class providers are supported, kerberos + TLS +//! compatibility, etc.) happens in the validate step. +//! +//! `KafkaAuthorization::get_opa_config` is a pure fetch + URL assembly (no validation to peel off) +//! and stays here as-is. + +use snafu::{ResultExt, Snafu}; +use stackable_operator::client::Client; + +use crate::crd::{ + authentication::{self, ResolvedAuthenticationClasses}, + authorization::{self, KafkaAuthorizationConfig}, + v1alpha1, +}; + +#[derive(Snafu, Debug)] +pub enum Error { + #[snafu(display("failed to fetch authentication classes"))] + FetchAuthenticationClasses { source: authentication::Error }, + + #[snafu(display("failed to get OPA config"))] + GetOpaConfig { source: authorization::Error }, +} + +type Result = std::result::Result; + +/// Kubernetes objects referenced from the [`v1alpha1::KafkaCluster`] spec, already fetched but +/// not yet validated. +pub struct DereferencedObjects { + pub resolved_authentication_classes: ResolvedAuthenticationClasses, + pub resolved_authorization_config: Option, +} + +/// Fetches all Kubernetes objects referenced from the [`v1alpha1::KafkaCluster`] spec. +pub async fn dereference( + client: &Client, + kafka: &v1alpha1::KafkaCluster, +) -> Result { + let resolved_authentication_classes = ResolvedAuthenticationClasses::fetch_references( + client, + &kafka.spec.cluster_config.authentication, + ) + .await + .context(FetchAuthenticationClassesSnafu)?; + + let resolved_authorization_config = kafka + .spec + .cluster_config + .authorization + .clone() + .get_opa_config(client, kafka) + .await + .context(GetOpaConfigSnafu)?; + + Ok(DereferencedObjects { + resolved_authentication_classes, + resolved_authorization_config, + }) +} diff --git a/rust/operator-binary/src/controller/validate.rs b/rust/operator-binary/src/controller/validate.rs new file mode 100644 index 00000000..19b229c7 --- /dev/null +++ b/rust/operator-binary/src/controller/validate.rs @@ -0,0 +1,182 @@ +//! The validate step in the KafkaCluster controller. +//! +//! Synchronously validates inputs that don't require a Kubernetes client. Produces +//! [`ValidatedInputs`], consumed by the rest of `reconcile_kafka`. + +use std::collections::HashMap; + +use product_config::{ProductConfigManager, types::PropertyNameKind}; +use snafu::{ResultExt, Snafu}; +use stackable_operator::{ + cli::OperatorEnvironmentOptions, + commons::product_image_selection::{self, ResolvedProductImage}, + product_config_utils::{ + ValidatedRoleConfigByPropertyKind, transform_all_roles_to_config, + validate_all_roles_and_groups_config, + }, +}; + +use crate::{ + controller::dereference::DereferencedObjects, + crd::{ + self, CONTAINER_IMAGE_BASE_NAME, JVM_SECURITY_PROPERTIES_FILE, + authentication::{self}, + role::{KafkaRole, broker::BROKER_PROPERTIES_FILE, controller::CONTROLLER_PROPERTIES_FILE}, + security::{self, KafkaTlsSecurity}, + v1alpha1, + }, +}; + +#[derive(Snafu, Debug)] +pub enum Error { + #[snafu(display("failed to resolve product image"))] + ResolveProductImage { + source: product_image_selection::Error, + }, + + #[snafu(display("failed to validate authentication classes"))] + InvalidAuthenticationClassConfiguration { source: authentication::Error }, + + #[snafu(display("failed to validate authentication method"))] + FailedToValidateAuthenticationMethod { source: security::Error }, + + #[snafu(display("cluster object defines no '{role}' role"))] + MissingKafkaRole { source: crd::Error, role: KafkaRole }, + + #[snafu(display("failed to generate product config"))] + GenerateProductConfig { + source: stackable_operator::product_config_utils::Error, + }, + + #[snafu(display("invalid product config"))] + InvalidProductConfig { + source: stackable_operator::product_config_utils::Error, + }, +} + +type Result = std::result::Result; + +/// Synchronous inputs the rest of `reconcile_kafka` needs after dereferencing. +pub struct ValidatedInputs { + pub resolved_product_image: ResolvedProductImage, + pub kafka_security: KafkaTlsSecurity, + pub validated_role_config: ValidatedRoleConfigByPropertyKind, +} + +/// Validates the cluster spec and the dereferenced inputs. +pub fn validate( + kafka: &v1alpha1::KafkaCluster, + dereferenced_objects: &DereferencedObjects, + operator_environment: &OperatorEnvironmentOptions, + product_config: &ProductConfigManager, +) -> Result { + let resolved_product_image = kafka + .spec + .image + .resolve( + CONTAINER_IMAGE_BASE_NAME, + &operator_environment.image_repository, + crate::built_info::PKG_VERSION, + ) + .context(ResolveProductImageSnafu)?; + + let resolved_authentication_classes = dereferenced_objects + .resolved_authentication_classes + .validate() + .context(InvalidAuthenticationClassConfigurationSnafu)?; + + let opa_secret_class = dereferenced_objects + .resolved_authorization_config + .as_ref() + .and_then(|cfg| cfg.secret_class.clone()); + + let kafka_security = KafkaTlsSecurity::new_from_kafka_cluster( + kafka, + resolved_authentication_classes, + opa_secret_class, + ); + + kafka_security + .validate_authentication_methods() + .context(FailedToValidateAuthenticationMethodSnafu)?; + + let validated_role_config = validated_product_config( + kafka, + &resolved_product_image.product_version, + product_config, + )?; + + Ok(ValidatedInputs { + resolved_product_image, + kafka_security, + validated_role_config, + }) +} + +fn validated_product_config( + kafka: &v1alpha1::KafkaCluster, + product_version: &str, + product_config: &ProductConfigManager, +) -> Result { + let mut role_config = HashMap::new(); + + let broker_role = [( + KafkaRole::Broker.to_string(), + ( + vec![ + PropertyNameKind::File(BROKER_PROPERTIES_FILE.to_string()), + PropertyNameKind::File(JVM_SECURITY_PROPERTIES_FILE.to_string()), + PropertyNameKind::Env, + ], + kafka + .broker_role() + .cloned() + .context(MissingKafkaRoleSnafu { + role: KafkaRole::Broker, + })? + .erase(), + ), + )] + .into(); + + let broker_role_config = + transform_all_roles_to_config(kafka, &broker_role).context(GenerateProductConfigSnafu)?; + + role_config.extend(broker_role_config); + + // TODO: need this if because controller_role() raises an error + if kafka.spec.controllers.is_some() { + let controller_role = [( + KafkaRole::Controller.to_string(), + ( + vec![ + PropertyNameKind::File(CONTROLLER_PROPERTIES_FILE.to_string()), + PropertyNameKind::File(JVM_SECURITY_PROPERTIES_FILE.to_string()), + PropertyNameKind::Env, + ], + kafka + .controller_role() + .cloned() + .context(MissingKafkaRoleSnafu { + role: KafkaRole::Controller, + })? + .erase(), + ), + )] + .into(); + + let controller_role_config = transform_all_roles_to_config(kafka, &controller_role) + .context(GenerateProductConfigSnafu)?; + + role_config.extend(controller_role_config); + } + + validate_all_roles_and_groups_config( + product_version, + &role_config, + product_config, + false, + false, + ) + .context(InvalidProductConfigSnafu) +} diff --git a/rust/operator-binary/src/crd/authentication.rs b/rust/operator-binary/src/crd/authentication.rs index 35d43a94..a1d41cf2 100644 --- a/rust/operator-binary/src/crd/authentication.rs +++ b/rust/operator-binary/src/crd/authentication.rs @@ -66,11 +66,10 @@ impl ResolvedAuthenticationClasses { } } - /// Resolve provided AuthenticationClasses via API calls and validate the contents. - /// Currently errors out if: - /// - AuthenticationClass could not be resolved - /// - Validation failed - pub async fn from_references( + /// Fetch the referenced AuthenticationClasses from the Kubernetes API without validating them. + /// + /// Call [`Self::validate`] on the result to enforce the constraints documented there. + pub async fn fetch_references( client: &Client, auth_classes: &Vec, ) -> Result { @@ -91,7 +90,9 @@ impl ResolvedAuthenticationClasses { ); } - ResolvedAuthenticationClasses::new(resolved_authentication_classes).validate() + Ok(ResolvedAuthenticationClasses::new( + resolved_authentication_classes, + )) } /// Return the (first) TLS `AuthenticationClass` if available diff --git a/rust/operator-binary/src/crd/security.rs b/rust/operator-binary/src/crd/security.rs index b38f32a9..334aeda8 100644 --- a/rust/operator-binary/src/crd/security.rs +++ b/rust/operator-binary/src/crd/security.rs @@ -16,7 +16,6 @@ use stackable_operator::{ volume::{SecretFormat, SecretOperatorVolumeSourceBuilder, VolumeBuilder}, }, }, - client::Client, commons::secret_class::SecretClassVolumeProvisionParts, crd::authentication::core, k8s_openapi::api::core::v1::Volume, @@ -27,7 +26,7 @@ use super::listener::KafkaListenerProtocol; use crate::crd::{ LISTENER_BOOTSTRAP_VOLUME_NAME, LISTENER_BROKER_VOLUME_NAME, STACKABLE_KERBEROS_KRB5_PATH, STACKABLE_LISTENER_BROKER_DIR, - authentication::{self, ResolvedAuthenticationClasses}, + authentication::ResolvedAuthenticationClasses, listener::{self, KafkaListenerName, node_address_cmd_env, node_port_cmd_env}, role::KafkaRole, tls, v1alpha1, @@ -35,9 +34,6 @@ use crate::crd::{ #[derive(Snafu, Debug)] pub enum Error { - #[snafu(display("failed to process authentication class"))] - InvalidAuthenticationClassConfiguration { source: authentication::Error }, - #[snafu(display("failed to build the secret operator Volume"))] SecretVolumeBuild { source: stackable_operator::builder::pod::volume::SecretOperatorVolumeSourceBuilderError, @@ -114,20 +110,17 @@ impl KafkaTlsSecurity { } } - /// Create a `KafkaSecurity` struct from the Kafka custom resource and resolve - /// all provided `AuthenticationClass` references. - pub async fn new_from_kafka_cluster( - client: &Client, + /// Build a [`KafkaTlsSecurity`] from already-resolved authentication classes. + /// + /// The async retrieval of [`ResolvedAuthenticationClasses`] now happens in the dereference + /// step of the controller; this constructor only reads TLS settings from the spec. + pub fn new_from_kafka_cluster( kafka: &v1alpha1::KafkaCluster, + resolved_authentication_classes: ResolvedAuthenticationClasses, opa_secret_class: Option, - ) -> Result { - Ok(KafkaTlsSecurity { - resolved_authentication_classes: ResolvedAuthenticationClasses::from_references( - client, - &kafka.spec.cluster_config.authentication, - ) - .await - .context(InvalidAuthenticationClassConfigurationSnafu)?, + ) -> Self { + KafkaTlsSecurity { + resolved_authentication_classes, internal_secret_class: kafka .spec .cluster_config @@ -142,7 +135,7 @@ impl KafkaTlsSecurity { .as_ref() .and_then(|tls| tls.server_secret_class.clone()), opa_secret_class, - }) + } } /// Check if TLS encryption is enabled. This could be due to: From 92f6afc3d99b415c54795f48f3906dd5d7c7cab4 Mon Sep 17 00:00:00 2001 From: Malte Sander Date: Tue, 19 May 2026 16:20:22 +0200 Subject: [PATCH 3/5] fix: update crate hashes --- Cargo.nix | 18 +++++++++--------- crate-hashes.json | 18 +++++++++--------- 2 files changed, 18 insertions(+), 18 deletions(-) diff --git a/Cargo.nix b/Cargo.nix index 75415d97..ff2e1a37 100644 --- a/Cargo.nix +++ b/Cargo.nix @@ -4844,7 +4844,7 @@ rec { src = pkgs.fetchgit { url = "https://github.com/stackabletech/operator-rs.git"; rev = "7a5f0c3fbcd091340214a23f0607fcd4b4fcc152"; - sha256 = "0d58yvxvy8hbai12bjhcyvh4zw182j5dsfyqja4k2xc1vzjy29by"; + sha256 = "0lj969rjbxairjglrnaq0xhabvdrq5nd6wl1i0y9pr50nhh7zvgk"; }; libName = "k8s_version"; authors = [ @@ -9518,7 +9518,7 @@ rec { src = pkgs.fetchgit { url = "https://github.com/stackabletech/operator-rs.git"; rev = "7a5f0c3fbcd091340214a23f0607fcd4b4fcc152"; - sha256 = "0d58yvxvy8hbai12bjhcyvh4zw182j5dsfyqja4k2xc1vzjy29by"; + sha256 = "0lj969rjbxairjglrnaq0xhabvdrq5nd6wl1i0y9pr50nhh7zvgk"; }; libName = "stackable_certs"; authors = [ @@ -9713,7 +9713,7 @@ rec { src = pkgs.fetchgit { url = "https://github.com/stackabletech/operator-rs.git"; rev = "7a5f0c3fbcd091340214a23f0607fcd4b4fcc152"; - sha256 = "0d58yvxvy8hbai12bjhcyvh4zw182j5dsfyqja4k2xc1vzjy29by"; + sha256 = "0lj969rjbxairjglrnaq0xhabvdrq5nd6wl1i0y9pr50nhh7zvgk"; }; libName = "stackable_operator"; authors = [ @@ -9893,7 +9893,7 @@ rec { src = pkgs.fetchgit { url = "https://github.com/stackabletech/operator-rs.git"; rev = "7a5f0c3fbcd091340214a23f0607fcd4b4fcc152"; - sha256 = "0d58yvxvy8hbai12bjhcyvh4zw182j5dsfyqja4k2xc1vzjy29by"; + sha256 = "0lj969rjbxairjglrnaq0xhabvdrq5nd6wl1i0y9pr50nhh7zvgk"; }; procMacro = true; libName = "stackable_operator_derive"; @@ -9928,7 +9928,7 @@ rec { src = pkgs.fetchgit { url = "https://github.com/stackabletech/operator-rs.git"; rev = "7a5f0c3fbcd091340214a23f0607fcd4b4fcc152"; - sha256 = "0d58yvxvy8hbai12bjhcyvh4zw182j5dsfyqja4k2xc1vzjy29by"; + sha256 = "0lj969rjbxairjglrnaq0xhabvdrq5nd6wl1i0y9pr50nhh7zvgk"; }; libName = "stackable_shared"; authors = [ @@ -10009,7 +10009,7 @@ rec { src = pkgs.fetchgit { url = "https://github.com/stackabletech/operator-rs.git"; rev = "7a5f0c3fbcd091340214a23f0607fcd4b4fcc152"; - sha256 = "0d58yvxvy8hbai12bjhcyvh4zw182j5dsfyqja4k2xc1vzjy29by"; + sha256 = "0lj969rjbxairjglrnaq0xhabvdrq5nd6wl1i0y9pr50nhh7zvgk"; }; libName = "stackable_telemetry"; authors = [ @@ -10119,7 +10119,7 @@ rec { src = pkgs.fetchgit { url = "https://github.com/stackabletech/operator-rs.git"; rev = "7a5f0c3fbcd091340214a23f0607fcd4b4fcc152"; - sha256 = "0d58yvxvy8hbai12bjhcyvh4zw182j5dsfyqja4k2xc1vzjy29by"; + sha256 = "0lj969rjbxairjglrnaq0xhabvdrq5nd6wl1i0y9pr50nhh7zvgk"; }; libName = "stackable_versioned"; authors = [ @@ -10169,7 +10169,7 @@ rec { src = pkgs.fetchgit { url = "https://github.com/stackabletech/operator-rs.git"; rev = "7a5f0c3fbcd091340214a23f0607fcd4b4fcc152"; - sha256 = "0d58yvxvy8hbai12bjhcyvh4zw182j5dsfyqja4k2xc1vzjy29by"; + sha256 = "0lj969rjbxairjglrnaq0xhabvdrq5nd6wl1i0y9pr50nhh7zvgk"; }; procMacro = true; libName = "stackable_versioned_macros"; @@ -10237,7 +10237,7 @@ rec { src = pkgs.fetchgit { url = "https://github.com/stackabletech/operator-rs.git"; rev = "7a5f0c3fbcd091340214a23f0607fcd4b4fcc152"; - sha256 = "0d58yvxvy8hbai12bjhcyvh4zw182j5dsfyqja4k2xc1vzjy29by"; + sha256 = "0lj969rjbxairjglrnaq0xhabvdrq5nd6wl1i0y9pr50nhh7zvgk"; }; libName = "stackable_webhook"; authors = [ diff --git a/crate-hashes.json b/crate-hashes.json index a6396ca0..86f2b840 100644 --- a/crate-hashes.json +++ b/crate-hashes.json @@ -1,12 +1,12 @@ { - "git+https://github.com/stackabletech/operator-rs.git?tag=stackable-operator-0.111.1#k8s-version@0.1.3": "0d58yvxvy8hbai12bjhcyvh4zw182j5dsfyqja4k2xc1vzjy29by", - "git+https://github.com/stackabletech/operator-rs.git?tag=stackable-operator-0.111.1#stackable-certs@0.4.0": "0d58yvxvy8hbai12bjhcyvh4zw182j5dsfyqja4k2xc1vzjy29by", - "git+https://github.com/stackabletech/operator-rs.git?tag=stackable-operator-0.111.1#stackable-operator-derive@0.3.1": "0d58yvxvy8hbai12bjhcyvh4zw182j5dsfyqja4k2xc1vzjy29by", - "git+https://github.com/stackabletech/operator-rs.git?tag=stackable-operator-0.111.1#stackable-operator@0.111.1": "0d58yvxvy8hbai12bjhcyvh4zw182j5dsfyqja4k2xc1vzjy29by", - "git+https://github.com/stackabletech/operator-rs.git?tag=stackable-operator-0.111.1#stackable-shared@0.1.0": "0d58yvxvy8hbai12bjhcyvh4zw182j5dsfyqja4k2xc1vzjy29by", - "git+https://github.com/stackabletech/operator-rs.git?tag=stackable-operator-0.111.1#stackable-telemetry@0.6.3": "0d58yvxvy8hbai12bjhcyvh4zw182j5dsfyqja4k2xc1vzjy29by", - "git+https://github.com/stackabletech/operator-rs.git?tag=stackable-operator-0.111.1#stackable-versioned-macros@0.10.0": "0d58yvxvy8hbai12bjhcyvh4zw182j5dsfyqja4k2xc1vzjy29by", - "git+https://github.com/stackabletech/operator-rs.git?tag=stackable-operator-0.111.1#stackable-versioned@0.10.0": "0d58yvxvy8hbai12bjhcyvh4zw182j5dsfyqja4k2xc1vzjy29by", - "git+https://github.com/stackabletech/operator-rs.git?tag=stackable-operator-0.111.1#stackable-webhook@0.9.1": "0d58yvxvy8hbai12bjhcyvh4zw182j5dsfyqja4k2xc1vzjy29by", + "git+https://github.com/stackabletech/operator-rs.git?tag=stackable-operator-0.111.1#k8s-version@0.1.3": "0lj969rjbxairjglrnaq0xhabvdrq5nd6wl1i0y9pr50nhh7zvgk", + "git+https://github.com/stackabletech/operator-rs.git?tag=stackable-operator-0.111.1#stackable-certs@0.4.0": "0lj969rjbxairjglrnaq0xhabvdrq5nd6wl1i0y9pr50nhh7zvgk", + "git+https://github.com/stackabletech/operator-rs.git?tag=stackable-operator-0.111.1#stackable-operator-derive@0.3.1": "0lj969rjbxairjglrnaq0xhabvdrq5nd6wl1i0y9pr50nhh7zvgk", + "git+https://github.com/stackabletech/operator-rs.git?tag=stackable-operator-0.111.1#stackable-operator@0.111.1": "0lj969rjbxairjglrnaq0xhabvdrq5nd6wl1i0y9pr50nhh7zvgk", + "git+https://github.com/stackabletech/operator-rs.git?tag=stackable-operator-0.111.1#stackable-shared@0.1.0": "0lj969rjbxairjglrnaq0xhabvdrq5nd6wl1i0y9pr50nhh7zvgk", + "git+https://github.com/stackabletech/operator-rs.git?tag=stackable-operator-0.111.1#stackable-telemetry@0.6.3": "0lj969rjbxairjglrnaq0xhabvdrq5nd6wl1i0y9pr50nhh7zvgk", + "git+https://github.com/stackabletech/operator-rs.git?tag=stackable-operator-0.111.1#stackable-versioned-macros@0.10.0": "0lj969rjbxairjglrnaq0xhabvdrq5nd6wl1i0y9pr50nhh7zvgk", + "git+https://github.com/stackabletech/operator-rs.git?tag=stackable-operator-0.111.1#stackable-versioned@0.10.0": "0lj969rjbxairjglrnaq0xhabvdrq5nd6wl1i0y9pr50nhh7zvgk", + "git+https://github.com/stackabletech/operator-rs.git?tag=stackable-operator-0.111.1#stackable-webhook@0.9.1": "0lj969rjbxairjglrnaq0xhabvdrq5nd6wl1i0y9pr50nhh7zvgk", "git+https://github.com/stackabletech/product-config.git?tag=0.8.0#product-config@0.8.0": "1dz70kapm2wdqcr7ndyjji0lhsl98bsq95gnb2lw487wf6yr7987" } \ No newline at end of file From 259abda9e4086e05fcb308c375492116a2ee56a7 Mon Sep 17 00:00:00 2001 From: Malte Sander Date: Tue, 19 May 2026 17:19:25 +0200 Subject: [PATCH 4/5] feat: add smoke snapshot test --- tests/templates/kuttl/smoke/30-assert.yaml.j2 | 91 +---- tests/templates/kuttl/smoke/31-assert.yaml | 11 +- tests/templates/kuttl/smoke/33-assert.yaml.j2 | 316 ++++++++++++++++++ tests/templates/kuttl/smoke/34-assert.yaml.j2 | 109 ++++++ 4 files changed, 432 insertions(+), 95 deletions(-) create mode 100644 tests/templates/kuttl/smoke/33-assert.yaml.j2 create mode 100644 tests/templates/kuttl/smoke/34-assert.yaml.j2 diff --git a/tests/templates/kuttl/smoke/30-assert.yaml.j2 b/tests/templates/kuttl/smoke/30-assert.yaml.j2 index a45a369e..4b8b0810 100644 --- a/tests/templates/kuttl/smoke/30-assert.yaml.j2 +++ b/tests/templates/kuttl/smoke/30-assert.yaml.j2 @@ -1,91 +1,6 @@ --- apiVersion: kuttl.dev/v1beta1 kind: TestAssert -timeout: 300 ---- -apiVersion: apps/v1 -kind: StatefulSet -metadata: - name: test-kafka-broker-default - generation: 1 # There should be no unneeded Pod restarts - labels: - restarter.stackable.tech/enabled: "true" -spec: - template: - spec: - containers: - - name: kafka - resources: - limits: - cpu: 1100m # From podOverrides - memory: 2Gi - requests: - cpu: 300m # From podOverrides - memory: 2Gi - - name: kcat-prober - resources: - limits: - cpu: 200m - memory: 128Mi - requests: - cpu: 100m - memory: 128Mi -{% if lookup('env', 'VECTOR_AGGREGATOR') %} - - name: vector - resources: - limits: - cpu: 500m - memory: 128Mi - requests: - cpu: 250m - memory: 128Mi -{% endif %} - terminationGracePeriodSeconds: 1800 - volumes: -{% if test_scenario['values']['use-client-tls'] == 'true' %} - - name: tls-kcat - ephemeral: - volumeClaimTemplate: - metadata: - annotations: - secrets.stackable.tech/backend.autotls.cert.lifetime: 7d - - name: tls-kafka-server - ephemeral: - volumeClaimTemplate: - metadata: - annotations: - secrets.stackable.tech/backend.autotls.cert.lifetime: 7d -{% endif %} - - name: tls-kafka-internal - ephemeral: - volumeClaimTemplate: - metadata: - annotations: - secrets.stackable.tech/backend.autotls.cert.lifetime: 7d - - name: log-config - - name: listener-broker - - name: config - - name: log -status: - readyReplicas: 1 - replicas: 1 ---- -apiVersion: v1 -kind: PersistentVolumeClaim -metadata: - name: log-dirs-test-kafka-broker-default-0 -spec: - resources: - requests: - storage: 2Gi -status: - phase: Bound ---- -apiVersion: policy/v1 -kind: PodDisruptionBudget -metadata: - name: test-kafka-broker -status: - expectedPods: 1 - currentHealthy: 1 - disruptionsAllowed: 1 +timeout: 600 +commands: + - script: kubectl -n $NAMESPACE wait --for=condition=available=true kafkaclusters.kafka.stackable.tech/test-kafka --timeout 601s diff --git a/tests/templates/kuttl/smoke/31-assert.yaml b/tests/templates/kuttl/smoke/31-assert.yaml index 26a55394..fe4bbf4e 100644 --- a/tests/templates/kuttl/smoke/31-assert.yaml +++ b/tests/templates/kuttl/smoke/31-assert.yaml @@ -6,14 +6,11 @@ commands: # # Test envOverrides # + # configOverrides are covered by the ConfigMap data snapshot in 34-assert.yaml.j2; + # env overrides cannot be snapshotted there because the operator emits a random + # NODE_ID_OFFSET into the StatefulSet env array (kuttl matches arrays positionally), + # so we keep these targeted yq checks here. - script: | kubectl -n $NAMESPACE get sts test-kafka-broker-default -o yaml | yq -e '.spec.template.spec.containers[] | select (.name == "kafka") | .env[] | select (.name == "COMMON_VAR" and .value == "group-value")' kubectl -n $NAMESPACE get sts test-kafka-broker-default -o yaml | yq -e '.spec.template.spec.containers[] | select (.name == "kafka") | .env[] | select (.name == "GROUP_VAR" and .value == "group-value")' kubectl -n $NAMESPACE get sts test-kafka-broker-default -o yaml | yq -e '.spec.template.spec.containers[] | select (.name == "kafka") | .env[] | select (.name == "ROLE_VAR" and .value == "role-value")' - # - # Test configOverrides - # - - script: | - kubectl -n $NAMESPACE get cm test-kafka-broker-default -o yaml | yq -e '.data."broker.properties"' | grep "compression.type=snappy" - kubectl -n $NAMESPACE get cm test-kafka-broker-default -o yaml | yq -e '.data."broker.properties"' | grep "controller.quorum.election.backoff.max.ms=2000" - kubectl -n $NAMESPACE get cm test-kafka-broker-default -o yaml | yq -e '.data."broker.properties"' | grep "controller.quorum.fetch.timeout.ms=3000" diff --git a/tests/templates/kuttl/smoke/33-assert.yaml.j2 b/tests/templates/kuttl/smoke/33-assert.yaml.j2 new file mode 100644 index 00000000..1ed5fe65 --- /dev/null +++ b/tests/templates/kuttl/smoke/33-assert.yaml.j2 @@ -0,0 +1,316 @@ +{# Templating flags used throughout this file. #} +{% set use_client_tls = test_scenario['values']['use-client-tls'] == 'true' %} +{% set vector_enabled = lookup('env', 'VECTOR_AGGREGATOR') | length > 0 %} +--- +# Declarative shape assertions for every kafka-managed resource in the smoke test +# except ConfigMap *.data* (covered in 34-assert.yaml.j2). +# +# kuttl performs subset matching: any field omitted here is not checked, and the +# live object may carry additional keys/labels. We therefore omit fields that are +# random per install (uids, resourceVersion, clusterIP, dynamic selector labels +# on the bootstrap Service, container `env` because the operator emits a random +# NODE_ID_OFFSET and kuttl matches array elements positionally). +apiVersion: kuttl.dev/v1beta1 +kind: TestAssert +timeout: 600 +--- +apiVersion: apps/v1 +kind: StatefulSet +metadata: + name: test-kafka-broker-default + generation: 1 # There should be no unneeded Pod restarts + labels: + app.kubernetes.io/component: broker + app.kubernetes.io/instance: test-kafka + app.kubernetes.io/managed-by: kafka.stackable.tech_kafkacluster + app.kubernetes.io/name: kafka + app.kubernetes.io/role-group: default + restarter.stackable.tech/enabled: "true" + stackable.tech/vendor: Stackable + ownerReferences: + - apiVersion: kafka.stackable.tech/v1alpha1 + controller: true + kind: KafkaCluster + name: test-kafka +spec: + podManagementPolicy: Parallel + replicas: 1 + selector: + matchLabels: + app.kubernetes.io/component: broker + app.kubernetes.io/instance: test-kafka + app.kubernetes.io/name: kafka + app.kubernetes.io/role-group: default + serviceName: test-kafka-broker-default-headless + template: + metadata: + labels: + app.kubernetes.io/component: broker + app.kubernetes.io/instance: test-kafka + app.kubernetes.io/managed-by: kafka.stackable.tech_kafkacluster + app.kubernetes.io/name: kafka + app.kubernetes.io/role-group: default + stackable.tech/vendor: Stackable + spec: + serviceAccount: test-kafka-serviceaccount + serviceAccountName: test-kafka-serviceaccount + terminationGracePeriodSeconds: 1800 + containers: + - name: kafka + resources: + limits: + cpu: 1100m # From podOverrides + memory: 2Gi + requests: + cpu: 300m # From podOverrides + memory: 2Gi + - name: kcat-prober + resources: + limits: + cpu: 200m + memory: 128Mi + requests: + cpu: 100m + memory: 128Mi +{% if vector_enabled %} + - name: vector + resources: + limits: + cpu: 500m + memory: 128Mi + requests: + cpu: 250m + memory: 128Mi +{% endif %} + volumes: +{% if use_client_tls %} + - name: tls-kcat + ephemeral: + volumeClaimTemplate: + metadata: + annotations: + secrets.stackable.tech/backend.autotls.cert.lifetime: 7d + - name: tls-kafka-server + ephemeral: + volumeClaimTemplate: + metadata: + annotations: + secrets.stackable.tech/backend.autotls.cert.lifetime: 7d +{% endif %} + - name: tls-kafka-internal + ephemeral: + volumeClaimTemplate: + metadata: + annotations: + secrets.stackable.tech/backend.autotls.cert.lifetime: 7d + - name: log-config + - name: listener-broker + - name: config + - name: log +status: + readyReplicas: 1 + replicas: 1 +--- +apiVersion: v1 +kind: Service +metadata: + name: test-kafka-broker-default-bootstrap + labels: + app.kubernetes.io/component: broker + app.kubernetes.io/instance: test-kafka-broker-default-bootstrap + app.kubernetes.io/managed-by: listeners.stackable.tech_listener + app.kubernetes.io/name: listener + app.kubernetes.io/role-group: default + stackable.tech/vendor: Stackable + ownerReferences: + - apiVersion: listeners.stackable.tech/v1alpha1 + controller: true + kind: Listener + name: test-kafka-broker-default-bootstrap +spec: + # selector is intentionally not asserted: it contains a per-install + # `listener.stackable.tech/mnt.` label. + ports: +{% if use_client_tls %} + - name: kafka-tls + port: 9093 + protocol: TCP + targetPort: 9093 +{% else %} + - name: kafka + port: 9092 + protocol: TCP + targetPort: 9092 +{% endif %} + type: ClusterIP +--- +apiVersion: v1 +kind: Service +metadata: + name: test-kafka-broker-default-headless + labels: + app.kubernetes.io/component: broker + app.kubernetes.io/instance: test-kafka + app.kubernetes.io/managed-by: kafka.stackable.tech_kafkacluster + app.kubernetes.io/name: kafka + app.kubernetes.io/role-group: default + stackable.tech/vendor: Stackable + ownerReferences: + - apiVersion: kafka.stackable.tech/v1alpha1 + controller: true + kind: KafkaCluster + name: test-kafka +spec: + clusterIP: None + ports: +{% if use_client_tls %} + - name: kafka-tls + port: 9093 + protocol: TCP + targetPort: 9093 +{% else %} + - name: kafka + port: 9092 + protocol: TCP + targetPort: 9092 +{% endif %} + publishNotReadyAddresses: true + selector: + app.kubernetes.io/component: broker + app.kubernetes.io/instance: test-kafka + app.kubernetes.io/name: kafka + app.kubernetes.io/role-group: default + type: ClusterIP +--- +apiVersion: v1 +kind: Service +metadata: + name: test-kafka-broker-default-metrics + annotations: + prometheus.io/path: /metrics + prometheus.io/port: "9606" + prometheus.io/scheme: http + prometheus.io/scrape: "true" + labels: + app.kubernetes.io/component: broker + app.kubernetes.io/instance: test-kafka + app.kubernetes.io/managed-by: kafka.stackable.tech_kafkacluster + app.kubernetes.io/name: kafka + app.kubernetes.io/role-group: default + prometheus.io/scrape: "true" + stackable.tech/vendor: Stackable + ownerReferences: + - apiVersion: kafka.stackable.tech/v1alpha1 + controller: true + kind: KafkaCluster + name: test-kafka +spec: + clusterIP: None + ports: + - name: metrics + port: 9606 + protocol: TCP + targetPort: 9606 + publishNotReadyAddresses: true + selector: + app.kubernetes.io/component: broker + app.kubernetes.io/instance: test-kafka + app.kubernetes.io/name: kafka + app.kubernetes.io/role-group: default + type: ClusterIP +--- +apiVersion: listeners.stackable.tech/v1alpha1 +kind: Listener +metadata: + name: test-kafka-broker-default-bootstrap + labels: + app.kubernetes.io/component: broker + app.kubernetes.io/instance: test-kafka + app.kubernetes.io/managed-by: kafka.stackable.tech_kafkacluster + app.kubernetes.io/name: kafka + app.kubernetes.io/role-group: default + stackable.tech/vendor: Stackable + ownerReferences: + - apiVersion: kafka.stackable.tech/v1alpha1 + controller: true + kind: KafkaCluster + name: test-kafka +spec: + className: cluster-internal + ports: +{% if use_client_tls %} + - name: kafka-tls + port: 9093 + protocol: TCP +{% else %} + - name: kafka + port: 9092 + protocol: TCP +{% endif %} +--- +apiVersion: policy/v1 +kind: PodDisruptionBudget +metadata: + name: test-kafka-broker + labels: + app.kubernetes.io/component: broker + app.kubernetes.io/instance: test-kafka + app.kubernetes.io/managed-by: kafka.stackable.tech_kafkacluster + app.kubernetes.io/name: kafka + ownerReferences: + - apiVersion: kafka.stackable.tech/v1alpha1 + controller: true + kind: KafkaCluster + name: test-kafka +spec: + maxUnavailable: 1 + selector: + matchLabels: + app.kubernetes.io/component: broker + app.kubernetes.io/instance: test-kafka + app.kubernetes.io/name: kafka +status: + currentHealthy: 1 + disruptionsAllowed: 1 + expectedPods: 1 +--- +apiVersion: v1 +kind: ServiceAccount +metadata: + name: test-kafka-serviceaccount + labels: + app.kubernetes.io/instance: test-kafka + app.kubernetes.io/managed-by: kafka.stackable.tech_kafkacluster + app.kubernetes.io/name: kafka + ownerReferences: + - apiVersion: kafka.stackable.tech/v1alpha1 + controller: true + kind: KafkaCluster + name: test-kafka +--- +apiVersion: rbac.authorization.k8s.io/v1 +kind: RoleBinding +metadata: + name: test-kafka-rolebinding + labels: + app.kubernetes.io/instance: test-kafka + app.kubernetes.io/managed-by: kafka.stackable.tech_kafkacluster + app.kubernetes.io/name: kafka +roleRef: + apiGroup: rbac.authorization.k8s.io + kind: ClusterRole + name: kafka-clusterrole +subjects: + - kind: ServiceAccount + name: test-kafka-serviceaccount +--- +apiVersion: v1 +kind: PersistentVolumeClaim +metadata: + name: log-dirs-test-kafka-broker-default-0 +spec: + resources: + requests: + storage: 2Gi +status: + phase: Bound diff --git a/tests/templates/kuttl/smoke/34-assert.yaml.j2 b/tests/templates/kuttl/smoke/34-assert.yaml.j2 new file mode 100644 index 00000000..27b36f65 --- /dev/null +++ b/tests/templates/kuttl/smoke/34-assert.yaml.j2 @@ -0,0 +1,109 @@ +{# Templating flag used throughout this file. #} +{% set use_client_tls = test_scenario['values']['use-client-tls'] == 'true' %} +--- +# Snapshot the full `.data` of each operator-managed ConfigMap. +# Any code change that alters rendered config values will fail these diffs. +# +# Runs as its own step (after 30-33) so kuttl does not re-evaluate the heavy +# heredocs on every 1-second readiness retry of the install step. By this point +# the cluster is in steady state, so each script runs once. +# +# The heredoc is quoted (`<<'YAMLEOF'`) so shell substitution is disabled and +# property-file escapes like `${env\:POD_NAME}` survive verbatim. Only +# `__NAMESPACE__` is substituted afterwards via `sed`, because kuttl tests run +# in a randomized namespace per invocation. Both sides are normalized to +# canonical JSON via `yq -o=json`; keys are already alphabetical on both sides +# (operator stores BTreeMap; kubectl serializes maps sorted; the heredoc is +# hand-sorted). +apiVersion: kuttl.dev/v1beta1 +kind: TestAssert +timeout: 60 +commands: + - script: | + expected=$(cat <<'YAMLEOF' | sed "s|__NAMESPACE__|$NAMESPACE|g" | yq -o=json + KAFKA: test-kafka-broker-default-bootstrap.__NAMESPACE__.svc.cluster.local:{% if use_client_tls %}9093{% else %}9092{% endif %} + + YAMLEOF + ) + actual=$(kubectl -n $NAMESPACE get cm test-kafka -o yaml | yq -o=json '.data') + if [ "$expected" != "$actual" ]; then + echo "ERROR: ConfigMap test-kafka data drifted from snapshot." + echo "=== expected ===" + printf '%s\n' "$expected" + echo "=== actual ===" + printf '%s\n' "$actual" + exit 1 + fi + - script: | + expected=$(cat <<'YAMLEOF' | sed "s|__NAMESPACE__|$NAMESPACE|g" | yq -o=json + broker.properties: | + advertised.listeners=CLIENT\://${file\:UTF-8\:/stackable/listener-broker/default-address/address}\:${file\:UTF-8\:/stackable/listener-broker/default-address/ports/{% if use_client_tls %}kafka-tls{% else %}kafka{% endif %}},INTERNAL\://${env\:POD_NAME}.test-kafka-broker-default-headless.__NAMESPACE__.svc.cluster.local\:19093 + compression.type=snappy + controlled.shutdown.enable=true + controller.quorum.election.backoff.max.ms=2000 + controller.quorum.fetch.timeout.ms=3000 + inter.broker.listener.name=INTERNAL +{% if use_client_tls %} + listener.name.client.ssl.keystore.location=/stackable/tls-kafka-server/keystore.p12 + listener.name.client.ssl.keystore.password= + listener.name.client.ssl.keystore.type=PKCS12 + listener.name.client.ssl.truststore.location=/stackable/tls-kafka-server/truststore.p12 + listener.name.client.ssl.truststore.password= + listener.name.client.ssl.truststore.type=PKCS12 +{% endif %} + listener.name.controller.ssl.keystore.location=/stackable/tls-kafka-internal/keystore.p12 + listener.name.controller.ssl.keystore.password= + listener.name.controller.ssl.keystore.type=PKCS12 + listener.name.controller.ssl.truststore.location=/stackable/tls-kafka-internal/truststore.p12 + listener.name.controller.ssl.truststore.password= + listener.name.controller.ssl.truststore.type=PKCS12 + listener.name.internal.ssl.client.auth=required + listener.name.internal.ssl.keystore.location=/stackable/tls-kafka-internal/keystore.p12 + listener.name.internal.ssl.keystore.password= + listener.name.internal.ssl.keystore.type=PKCS12 + listener.name.internal.ssl.truststore.location=/stackable/tls-kafka-internal/truststore.p12 + listener.name.internal.ssl.truststore.password= + listener.name.internal.ssl.truststore.type=PKCS12 + listener.security.protocol.map=CLIENT\:{% if use_client_tls %}SSL{% else %}PLAINTEXT{% endif %},INTERNAL\:SSL,CONTROLLER\:SSL + listeners=CLIENT\://0.0.0.0\:{% if use_client_tls %}9093{% else %}9092{% endif %},INTERNAL\://0.0.0.0\:19093 + log.dirs=/stackable/data/topicdata + zookeeper.connect=${env\:ZOOKEEPER} + client.properties: | +{% if use_client_tls %} + security.protocol=SSL + ssl.truststore.type=PKCS12 + ssl.truststore.location=/stackable/tls-kafka-server/truststore.p12 + ssl.truststore.password= +{% else %} + security.protocol=PLAINTEXT +{% endif %} + jaas.properties: "" + log4j.properties: |+ + log4j.rootLogger=INFO, CONSOLE, FILE + + log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender + log4j.appender.CONSOLE.Threshold=INFO + log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout + log4j.appender.CONSOLE.layout.ConversionPattern=[%d] %p %m (%c)%n + + log4j.appender.FILE=org.apache.log4j.RollingFileAppender + log4j.appender.FILE.Threshold=INFO + log4j.appender.FILE.File=/stackable/log/kafka/kafka.log4j.xml + log4j.appender.FILE.MaxFileSize=5MB + log4j.appender.FILE.MaxBackupIndex=1 + log4j.appender.FILE.layout=org.apache.log4j.xml.XMLLayout + + security.properties: | + networkaddress.cache.negative.ttl=0 + networkaddress.cache.ttl=30 + YAMLEOF + ) + actual=$(kubectl -n $NAMESPACE get cm test-kafka-broker-default -o yaml | yq -o=json '.data') + if [ "$expected" != "$actual" ]; then + echo "ERROR: ConfigMap test-kafka-broker-default data drifted from snapshot." + echo "=== expected ===" + printf '%s\n' "$expected" + echo "=== actual ===" + printf '%s\n' "$actual" + exit 1 + fi From 08c2f099d72e384a9b9e8c19c3627171832997a3 Mon Sep 17 00:00:00 2001 From: Malte Sander Date: Tue, 19 May 2026 17:21:13 +0200 Subject: [PATCH 5/5] docs: adapted changelog --- CHANGELOG.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 7728b498..69e95afd 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -16,10 +16,12 @@ All notable changes to this project will be documented in this file. controllers). Previously, arbitrary file names were silently accepted and ignored ([#960]). - Bump `stackable-operator` to 0.111.1 and snafu to 0.9 ([#960], [#961]). +- Internal operator refactoring: introduce dereference() and validate() steps in the reconciler ([#968]). [#953]: https://github.com/stackabletech/kafka-operator/pull/953 [#960]: https://github.com/stackabletech/kafka-operator/pull/960 [#961]: https://github.com/stackabletech/kafka-operator/pull/961 +[#968]: https://github.com/stackabletech/kafka-operator/pull/968 ## [26.3.0] - 2026-03-16