diff --git a/CHANGELOG.md b/CHANGELOG.md index 548e7b68..9f29799a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -22,6 +22,7 @@ - BREAKING: Renamed and moved the `celeryExecutor` broker and results backend to `clusterConfig` ([#786]). The results backend `spec.celeryExecutors.resultBackend` is now `spec.clusterConfig.celeryResultsBackend`. The broker `spec.celeryExecutors.broker` is now `spec.clusterConfig.celeryBroker`. +- Internal operator refactoring: introduce dereference() and validate() steps in the reconciler ([#795]). ### Fixed @@ -37,6 +38,7 @@ [#754]: https://github.com/stackabletech/airflow-operator/pull/754 [#784]: https://github.com/stackabletech/airflow-operator/pull/784 [#786]: https://github.com/stackabletech/airflow-operator/pull/786 +[#795]: https://github.com/stackabletech/airflow-operator/pull/795 ## [26.3.0] - 2026-03-16 diff --git a/rust/operator-binary/src/airflow_controller.rs b/rust/operator-binary/src/airflow_controller.rs index 54671bb3..ea7dd9ad 100644 --- a/rust/operator-binary/src/airflow_controller.rs +++ b/rust/operator-binary/src/airflow_controller.rs @@ -2,7 +2,6 @@ use std::{ collections::{BTreeMap, BTreeSet, HashMap}, io::Write, - str::FromStr, sync::Arc, }; @@ -32,8 +31,7 @@ use stackable_operator::{ cli::OperatorEnvironmentOptions, cluster_resources::{ClusterResourceApplyStrategy, ClusterResources}, commons::{ - product_image_selection::{self, ResolvedProductImage}, - random_secret_creation, + product_image_selection::ResolvedProductImage, random_secret_creation, rbac::build_rbac_resources, }, crd::{authentication::ldap, git_sync, listener}, @@ -65,15 +63,14 @@ use stackable_operator::{ logging::controller::ReconcilerError, product_config_utils::{ CONFIG_OVERRIDE_FILE_FOOTER_KEY, CONFIG_OVERRIDE_FILE_HEADER_KEY, env_vars_from, - env_vars_from_rolegroup_config, transform_all_roles_to_config, - validate_all_roles_and_groups_config, + env_vars_from_rolegroup_config, }, product_logging::{ self, framework::LoggingError, spec::{ContainerLogConfig, Logging}, }, - role_utils::{GenericRoleConfig, RoleGroupRef}, + role_utils::RoleGroupRef, shared::time::Duration, status::condition::{ compute_conditions, operations::ClusterOperationsConditionBuilder, @@ -81,7 +78,7 @@ use stackable_operator::{ }, utils::COMMON_BASH_TRAP_FUNCTIONS, }; -use strum::{EnumDiscriminants, IntoEnumIterator, IntoStaticStr}; +use strum::{EnumDiscriminants, IntoStaticStr}; use crate::{ config::{self, PYTHON_IMPORTS}, @@ -151,21 +148,11 @@ pub enum Error { rolegroup: RoleGroupRef, }, - #[snafu(display("invalid product config"))] - InvalidProductConfig { - source: stackable_operator::product_config_utils::Error, - }, - #[snafu(display("object is missing metadata to build owner reference"))] ObjectMissingMetadataForOwnerRef { source: stackable_operator::builder::meta::Error, }, - #[snafu(display("Failed to transform configs"))] - ProductConfigTransform { - source: stackable_operator::product_config_utils::Error, - }, - #[snafu(display("failed to patch service account"))] ApplyServiceAccount { source: stackable_operator::cluster_resources::Error, @@ -196,12 +183,6 @@ pub enum Error { #[snafu(display("failed to resolve and merge config for role and role group"))] FailedToResolveConfig { source: crd::Error }, - #[snafu(display("could not parse Airflow role [{role}]"))] - UnidentifiedAirflowRole { - source: strum::ParseError, - role: String, - }, - #[snafu(display("invalid container name"))] InvalidContainerName { source: stackable_operator::builder::pod::container::Error, @@ -234,8 +215,20 @@ pub enum Error { source: stackable_operator::client::Error, }, - #[snafu(display("failed to apply authentication configuration"))] - InvalidAuthenticationConfig { source: crd::authentication::Error }, + #[snafu(display("failed to create internal secret"))] + InternalSecret { + source: random_secret_creation::Error, + }, + + #[snafu(display("failed to dereference cluster resources"))] + Dereference { + source: crate::controller::dereference::Error, + }, + + #[snafu(display("failed to validate cluster configuration"))] + Validate { + source: crate::controller::validate::Error, + }, #[snafu(display("pod template serialization"))] PodTemplateSerde { source: serde_yaml::Error }, @@ -320,21 +313,6 @@ pub enum Error { #[snafu(display("failed to configure service"))] ServiceConfiguration { source: crate::service::Error }, - - #[snafu(display("invalid authorization config"))] - InvalidAuthorizationConfig { - source: stackable_operator::commons::opa::Error, - }, - - #[snafu(display("failed to resolve product image"))] - ResolveProductImage { - source: product_image_selection::Error, - }, - - #[snafu(display("failed to create internal secret"))] - InvalidInternalSecret { - source: random_secret_creation::Error, - }, } type Result = std::result::Result; @@ -358,35 +336,14 @@ pub async fn reconcile_airflow( .context(InvalidAirflowClusterSnafu)?; let client = &ctx.client; - let resolved_product_image = airflow - .spec - .image - .resolve( - CONTAINER_IMAGE_BASE_NAME, - &ctx.operator_environment.image_repository, - crate::built_info::PKG_VERSION, - ) - .context(ResolveProductImageSnafu)?; + + let dereferenced = crate::controller::dereference::dereference(client, airflow) + .await + .context(DereferenceSnafu)?; let cluster_operation_cond_builder = ClusterOperationsConditionBuilder::new(&airflow.spec.cluster_operation); - let authentication_config = AirflowClientAuthenticationDetailsResolved::from( - &airflow.spec.cluster_config.authentication, - client, - ) - .await - .context(InvalidAuthenticationConfigSnafu)?; - - let authorization_config = AirflowAuthorizationResolved::from_authorization_config( - client, - airflow, - &airflow.spec.cluster_config.authorization, - ) - .await - .context(InvalidAuthorizationConfigSnafu)?; - // We don't have a config file, but do everything via env substitution - let templating_mechanism = TemplatingMechanism::BashEnvSubstitution; let metadata_database_connection_details = airflow .spec @@ -424,34 +381,49 @@ pub async fn reconcile_airflow( None }; - let mut roles = HashMap::new(); - - // if the kubernetes executor is specified there will be no worker role as the pods - // are provisioned by airflow as defined by the task (default: one pod per task) - for role in AirflowRole::iter() { - if let Some(resolved_role) = airflow.get_role(&role) { - roles.insert( - role.to_string(), - ( - vec![ - PropertyNameKind::Env, - PropertyNameKind::File(AIRFLOW_CONFIG_FILENAME.into()), - ], - resolved_role.clone(), - ), - ); - } - } - - let role_config = transform_all_roles_to_config(airflow, &roles); - let validated_role_config = validate_all_roles_and_groups_config( - &resolved_product_image.product_version, - &role_config.context(ProductConfigTransformSnafu)?, + let validated = crate::controller::validate::validate_cluster( + airflow, + CONTAINER_IMAGE_BASE_NAME, + &ctx.operator_environment.image_repository, + crate::built_info::PKG_VERSION, &ctx.product_config, - false, - false, ) - .context(InvalidProductConfigSnafu)?; + .context(ValidateSnafu)?; + + // TODO: Move secret creation to a dedicated apply step once it exists. + random_secret_creation::create_random_secret_if_not_exists( + &airflow.shared_internal_secret_secret_name(), + INTERNAL_SECRET_SECRET_KEY, + 256, + airflow, + client, + ) + .await + .context(InternalSecretSnafu)?; + + random_secret_creation::create_random_secret_if_not_exists( + &airflow.shared_jwt_secret_secret_name(), + JWT_SECRET_SECRET_KEY, + 256, + airflow, + client, + ) + .await + .context(InternalSecretSnafu)?; + + // https://airflow.apache.org/docs/apache-airflow/stable/security/secrets/fernet.html#security-fernet + // does not document how long the fernet key should be, but recommends using + // python -c "from cryptography.fernet import Fernet; print(Fernet.generate_key().decode())" + // which returns `jUm21LuA76YZmrIa9u4eXRg0h0P24MDC9IDOmDvJbfw=`, which has 44 characters, which makes 32 bytes. + random_secret_creation::create_random_secret_if_not_exists( + &airflow.shared_fernet_key_secret_name(), + FERNET_KEY_SECRET_KEY, + 32, + airflow, + client, + ) + .await + .context(InternalSecretSnafu)?; let mut cluster_resources = ClusterResources::new( APP_NAME, @@ -481,21 +453,19 @@ pub async fn reconcile_airflow( let mut ss_cond_builder = StatefulSetConditionBuilder::default(); - let airflow_executor = &airflow.spec.executor; - // if the kubernetes executor is specified, in place of a worker role that will be in the role // collection there will be a pod template created to be used for pod provisioning if let AirflowExecutor::KubernetesExecutors { common_configuration, - } = &airflow_executor + } = &validated.executor { build_executor_template( airflow, common_configuration, &metadata_database_connection_details, - &resolved_product_image, - &authentication_config, - &authorization_config, + &validated.image, + &dereferenced.authentication_config, + &dereferenced.authorization_config, &mut cluster_resources, client, &rbac_sa, @@ -503,94 +473,53 @@ pub async fn reconcile_airflow( .await?; } - random_secret_creation::create_random_secret_if_not_exists( - &airflow.shared_internal_secret_secret_name(), - INTERNAL_SECRET_SECRET_KEY, - 256, - airflow, - client, - ) - .await - .context(InvalidInternalSecretSnafu)?; + for (airflow_role, role_group_configs) in &validated.role_groups { + let role_name = airflow_role.to_string(); - random_secret_creation::create_random_secret_if_not_exists( - &airflow.shared_jwt_secret_secret_name(), - JWT_SECRET_SECRET_KEY, - 256, - airflow, - client, - ) - .await - .context(InvalidInternalSecretSnafu)?; - - random_secret_creation::create_random_secret_if_not_exists( - &airflow.shared_fernet_key_secret_name(), - FERNET_KEY_SECRET_KEY, - // https://airflow.apache.org/docs/apache-airflow/stable/security/secrets/fernet.html#security-fernet - // does not document how long the fernet key should be, but recommends using - // python -c "from cryptography.fernet import Fernet; print(Fernet.generate_key().decode())" - // which returns `jUm21LuA76YZmrIa9u4eXRg0h0P24MDC9IDOmDvJbfw=`, which has 44 characters, which makes 32 bytes. - 32, - airflow, - client, - ) - .await - .context(InvalidInternalSecretSnafu)?; - - for (role_name, role_config) in validated_role_config.iter() { - let airflow_role = - AirflowRole::from_str(role_name).context(UnidentifiedAirflowRoleSnafu { - role: role_name.to_string(), - })?; - - if let Some(GenericRoleConfig { - pod_disruption_budget: pdb, - }) = airflow.role_config(&airflow_role) - { - add_pdbs(&pdb, airflow, &airflow_role, client, &mut cluster_resources) - .await - .context(FailedToCreatePdbSnafu)?; - } + if let Some(role_config) = validated.role_configs.get(airflow_role) { + if let Some(pdb) = &role_config.pdb { + add_pdbs(pdb, airflow, airflow_role, client, &mut cluster_resources) + .await + .context(FailedToCreatePdbSnafu)?; + } - if let Some(listener_class) = airflow_role.listener_class_name(airflow) { - if let Some(listener_group_name) = airflow.group_listener_name(&airflow_role) { - let rg_group_listener = build_group_listener( - airflow, - build_recommended_labels( + if let Some(listener_class) = &role_config.listener_class { + if let Some(listener_group_name) = &role_config.group_listener_name { + let rg_group_listener = build_group_listener( airflow, - AIRFLOW_CONTROLLER_NAME, - &resolved_product_image.app_version_label_value, - role_name, - "none", - ), - listener_class.to_string(), - listener_group_name, - )?; - cluster_resources - .add(client, rg_group_listener) - .await - .context(ApplyGroupListenerSnafu)?; + build_recommended_labels( + airflow, + AIRFLOW_CONTROLLER_NAME, + &validated.image.app_version_label_value, + &role_name, + "none", + ), + listener_class.to_string(), + listener_group_name.clone(), + )?; + cluster_resources + .add(client, rg_group_listener) + .await + .context(ApplyGroupListenerSnafu)?; + } } } - for (rolegroup_name, rolegroup_config) in role_config.iter() { + for (rolegroup_name, validated_rg_config) in role_group_configs { let rolegroup = RoleGroupRef { cluster: ObjectRef::from_obj(airflow), - role: role_name.into(), + role: role_name.clone(), role_group: rolegroup_name.into(), }; - let merged_airflow_config = airflow - .merged_config(&airflow_role, &rolegroup) - .context(FailedToResolveConfigSnafu)?; - let git_sync_resources = git_sync::v1alpha2::GitSyncResources::new( &airflow.spec.cluster_config.dags_git_sync, - &resolved_product_image, - &env_vars_from_rolegroup_config(rolegroup_config), + &validated.image, + &env_vars_from_rolegroup_config(&validated_rg_config.product_config_properties), &airflow.volume_mounts(), LOG_VOLUME_NAME, - &merged_airflow_config + &validated_rg_config + .merged_config .logging .for_container(&Container::GitSync), ) @@ -599,7 +528,7 @@ pub async fn reconcile_airflow( let role_group_service_recommended_labels = build_recommended_labels( airflow, AIRFLOW_CONTROLLER_NAME, - &resolved_product_image.app_version_label_value, + &validated.image.app_version_label_value, &rolegroup.role, &rolegroup.role_group, ); @@ -643,12 +572,12 @@ pub async fn reconcile_airflow( let rg_configmap = build_rolegroup_config_map( airflow, - &resolved_product_image, + &validated.image, &rolegroup, - rolegroup_config, - &authentication_config, - &authorization_config, - &merged_airflow_config.logging, + &validated_rg_config.product_config_properties, + &dereferenced.authentication_config, + &dereferenced.authorization_config, + &validated_rg_config.merged_config.logging, &Container::Airflow, )?; cluster_resources @@ -658,22 +587,19 @@ pub async fn reconcile_airflow( rolegroup: rolegroup.clone(), })?; - // Note: The StatefulSet needs to be applied after all ConfigMaps and Secrets it mounts - // to prevent unnecessary Pod restarts. - // See https://github.com/stackabletech/commons-operator/issues/111 for details. let rg_statefulset = build_server_rolegroup_statefulset( airflow, - &resolved_product_image, - &airflow_role, + &validated.image, + airflow_role, &rolegroup, - rolegroup_config, - &authentication_config, - &authorization_config, + &validated_rg_config.product_config_properties, + &dereferenced.authentication_config, + &dereferenced.authorization_config, &metadata_database_connection_details, &celery_database_connection_details, &rbac_sa, - &merged_airflow_config, - airflow_executor, + &validated_rg_config.merged_config, + &validated.executor, &git_sync_resources, )?; diff --git a/rust/operator-binary/src/controller/dereference.rs b/rust/operator-binary/src/controller/dereference.rs new file mode 100644 index 00000000..64e38d70 --- /dev/null +++ b/rust/operator-binary/src/controller/dereference.rs @@ -0,0 +1,50 @@ +use snafu::{ResultExt, Snafu}; + +use crate::crd::{ + authentication::AirflowClientAuthenticationDetailsResolved, + authorization::AirflowAuthorizationResolved, v1alpha2, +}; + +#[derive(Snafu, Debug)] +pub enum Error { + #[snafu(display("failed to apply authentication configuration"))] + AuthenticationConfig { + source: crate::crd::authentication::Error, + }, + + #[snafu(display("invalid authorization config"))] + AuthorizationConfig { + source: stackable_operator::commons::opa::Error, + }, +} + +/// External references resolved during the dereference step. +pub struct DereferencedObjects { + pub authentication_config: AirflowClientAuthenticationDetailsResolved, + pub authorization_config: AirflowAuthorizationResolved, +} + +pub async fn dereference( + client: &stackable_operator::client::Client, + airflow: &v1alpha2::AirflowCluster, +) -> Result { + let authentication_config = AirflowClientAuthenticationDetailsResolved::from( + &airflow.spec.cluster_config.authentication, + client, + ) + .await + .context(AuthenticationConfigSnafu)?; + + let authorization_config = AirflowAuthorizationResolved::from_authorization_config( + client, + airflow, + &airflow.spec.cluster_config.authorization, + ) + .await + .context(AuthorizationConfigSnafu)?; + + Ok(DereferencedObjects { + authentication_config, + authorization_config, + }) +} diff --git a/rust/operator-binary/src/controller/mod.rs b/rust/operator-binary/src/controller/mod.rs new file mode 100644 index 00000000..1b261dfe --- /dev/null +++ b/rust/operator-binary/src/controller/mod.rs @@ -0,0 +1,2 @@ +pub mod dereference; +pub mod validate; diff --git a/rust/operator-binary/src/controller/validate.rs b/rust/operator-binary/src/controller/validate.rs new file mode 100644 index 00000000..6eb2a16c --- /dev/null +++ b/rust/operator-binary/src/controller/validate.rs @@ -0,0 +1,163 @@ +use std::{ + collections::{BTreeMap, HashMap}, + str::FromStr, +}; + +use product_config::{ProductConfigManager, types::PropertyNameKind}; +use snafu::{ResultExt, Snafu}; +use stackable_operator::{ + commons::product_image_selection::{self, ResolvedProductImage}, + product_config_utils::{transform_all_roles_to_config, validate_all_roles_and_groups_config}, + role_utils::RoleGroupRef, +}; +use strum::IntoEnumIterator; + +use crate::crd::{AIRFLOW_CONFIG_FILENAME, AirflowConfig, AirflowExecutor, AirflowRole, v1alpha2}; + +#[derive(Snafu, Debug)] +pub enum Error { + #[snafu(display("failed to resolve product image"))] + ResolveProductImage { + source: product_image_selection::Error, + }, + + #[snafu(display("invalid product config"))] + InvalidProductConfig { + source: stackable_operator::product_config_utils::Error, + }, + + #[snafu(display("failed to generate product config"))] + GenerateProductConfig { + source: stackable_operator::product_config_utils::Error, + }, + + #[snafu(display("failed to resolve and merge config for role and role group"))] + FailedToResolveConfig { source: crate::crd::Error }, + + #[snafu(display("could not parse Airflow role [{role}]"))] + UnidentifiedAirflowRole { + source: strum::ParseError, + role: String, + }, +} + +/// Per-role configuration extracted during validation. +#[derive(Clone, Debug)] +pub struct ValidatedRoleConfig { + pub pdb: Option, + pub listener_class: Option, + pub group_listener_name: Option, +} + +/// Per-rolegroup configuration: the merged CRD config plus the product-config properties. +#[derive(Clone, Debug)] +pub struct ValidatedRoleGroupConfig { + pub merged_config: AirflowConfig, + pub product_config_properties: HashMap>, +} + +/// The validated cluster: proves that product-config validation and config merging +/// succeeded for every role and role group before any resources are created. +#[derive(Clone, Debug)] +pub struct ValidatedAirflowCluster { + pub image: ResolvedProductImage, + pub role_groups: BTreeMap>, + pub role_configs: BTreeMap, + pub executor: AirflowExecutor, +} + +pub fn validate_cluster( + airflow: &v1alpha2::AirflowCluster, + image_base_name: &str, + image_repository: &str, + pkg_version: &str, + product_config_manager: &ProductConfigManager, +) -> Result { + let resolved_product_image = airflow + .spec + .image + .resolve(image_base_name, image_repository, pkg_version) + .context(ResolveProductImageSnafu)?; + + let mut roles = HashMap::new(); + + // if the kubernetes executor is specified there will be no worker role as the pods + // are provisioned by airflow as defined by the task (default: one pod per task) + for role in AirflowRole::iter() { + if let Some(resolved_role) = airflow.get_role(&role) { + roles.insert( + role.to_string(), + ( + vec![ + PropertyNameKind::Env, + PropertyNameKind::File(AIRFLOW_CONFIG_FILENAME.into()), + ], + resolved_role.clone(), + ), + ); + } + } + + let role_config = transform_all_roles_to_config(airflow, &roles); + let validated_role_config = validate_all_roles_and_groups_config( + &resolved_product_image.product_version, + &role_config.context(GenerateProductConfigSnafu)?, + product_config_manager, + false, + false, + ) + .context(InvalidProductConfigSnafu)?; + + let mut role_groups = BTreeMap::new(); + let mut role_configs = BTreeMap::new(); + + for (role_name, rolegroup_configs) in validated_role_config.iter() { + let airflow_role = + AirflowRole::from_str(role_name).context(UnidentifiedAirflowRoleSnafu { + role: role_name.to_string(), + })?; + + role_configs.insert( + airflow_role.clone(), + ValidatedRoleConfig { + pdb: airflow + .role_config(&airflow_role) + .map(|rc| rc.pod_disruption_budget), + listener_class: airflow_role + .listener_class_name(airflow) + .map(|s| s.to_string()), + group_listener_name: airflow.group_listener_name(&airflow_role), + }, + ); + + let mut group_configs = BTreeMap::new(); + for (rolegroup_name, rolegroup_config) in rolegroup_configs.iter() { + let rolegroup_ref = RoleGroupRef { + cluster: stackable_operator::kube::runtime::reflector::ObjectRef::from_obj(airflow), + role: role_name.into(), + role_group: rolegroup_name.into(), + }; + + let merged_config = airflow + .merged_config(&airflow_role, &rolegroup_ref) + .context(FailedToResolveConfigSnafu)?; + + group_configs.insert( + rolegroup_name.clone(), + ValidatedRoleGroupConfig { + merged_config, + product_config_properties: rolegroup_config.clone(), + }, + ); + } + + role_groups.insert(airflow_role, group_configs); + } + + Ok(ValidatedAirflowCluster { + image: resolved_product_image, + role_groups, + role_configs, + executor: airflow.spec.executor.clone(), + }) +} diff --git a/rust/operator-binary/src/crd/mod.rs b/rust/operator-binary/src/crd/mod.rs index 3e264113..83c54ef1 100644 --- a/rust/operator-binary/src/crd/mod.rs +++ b/rust/operator-binary/src/crd/mod.rs @@ -581,7 +581,9 @@ pub struct AirflowOpaConfig { Eq, Hash, JsonSchema, + Ord, PartialEq, + PartialOrd, Serialize, EnumString, )] diff --git a/rust/operator-binary/src/main.rs b/rust/operator-binary/src/main.rs index 6cc89e99..dee5603b 100644 --- a/rust/operator-binary/src/main.rs +++ b/rust/operator-binary/src/main.rs @@ -40,6 +40,7 @@ use crate::{ mod airflow_controller; mod config; +mod controller; mod controller_commons; mod crd; mod env_vars;