From c13d5a8c5c4bc762746f101e338ffa8e5a6de815 Mon Sep 17 00:00:00 2001 From: Andrew Kenworthy Date: Tue, 12 May 2026 14:21:48 +0200 Subject: [PATCH 1/9] refactor: derive Ord/PartialOrd on AirflowRole Co-Authored-By: Claude Opus 4.6 --- rust/operator-binary/src/crd/mod.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/rust/operator-binary/src/crd/mod.rs b/rust/operator-binary/src/crd/mod.rs index 3e264113..83c54ef1 100644 --- a/rust/operator-binary/src/crd/mod.rs +++ b/rust/operator-binary/src/crd/mod.rs @@ -581,7 +581,9 @@ pub struct AirflowOpaConfig { Eq, Hash, JsonSchema, + Ord, PartialEq, + PartialOrd, Serialize, EnumString, )] From bb25c1213b3cde833ded54fe6654a18a41c4c75c Mon Sep 17 00:00:00 2001 From: Andrew Kenworthy Date: Tue, 12 May 2026 15:01:12 +0200 Subject: [PATCH 2/9] refactor: extract dereference/validate pipeline from reconcile_airflow Move external resource resolution (product image, auth, authz, internal secrets) into controller::dereference module with its own error enum. Extract config validation and merging into validate_cluster(), which produces a ValidatedAirflowCluster proving all product-config validation succeeded before any Kubernetes resources are created. The validated struct owns the resolved product image and per-role/ per-rolegroup merged configs. Existing build functions are unchanged and receive their parameters from the validated structs. Co-Authored-By: Claude Opus 4.6 --- .../operator-binary/src/airflow_controller.rs | 352 +++++++++--------- .../src/controller/dereference.rs | 107 ++++++ rust/operator-binary/src/controller/mod.rs | 1 + rust/operator-binary/src/main.rs | 1 + 4 files changed, 287 insertions(+), 174 deletions(-) create mode 100644 rust/operator-binary/src/controller/dereference.rs create mode 100644 rust/operator-binary/src/controller/mod.rs diff --git a/rust/operator-binary/src/airflow_controller.rs b/rust/operator-binary/src/airflow_controller.rs index 54671bb3..5a266840 100644 --- a/rust/operator-binary/src/airflow_controller.rs +++ b/rust/operator-binary/src/airflow_controller.rs @@ -31,11 +31,7 @@ use stackable_operator::{ }, cli::OperatorEnvironmentOptions, cluster_resources::{ClusterResourceApplyStrategy, ClusterResources}, - commons::{ - product_image_selection::{self, ResolvedProductImage}, - random_secret_creation, - rbac::build_rbac_resources, - }, + commons::{product_image_selection::ResolvedProductImage, rbac::build_rbac_resources}, crd::{authentication::ldap, git_sync, listener}, database_connections::{ TemplatingMechanism, @@ -73,7 +69,7 @@ use stackable_operator::{ framework::LoggingError, spec::{ContainerLogConfig, Logging}, }, - role_utils::{GenericRoleConfig, RoleGroupRef}, + role_utils::RoleGroupRef, shared::time::Duration, status::condition::{ compute_conditions, operations::ClusterOperationsConditionBuilder, @@ -96,11 +92,7 @@ use crate::{ AirflowAuthenticationClassResolved, AirflowClientAuthenticationDetailsResolved, }, authorization::AirflowAuthorizationResolved, - build_recommended_labels, - internal_secret::{ - FERNET_KEY_SECRET_KEY, INTERNAL_SECRET_SECRET_KEY, JWT_SECRET_SECRET_KEY, - }, - v1alpha2, + build_recommended_labels, v1alpha2, }, env_vars::{self, build_airflow_template_envs}, operations::{ @@ -127,6 +119,33 @@ pub struct Ctx { pub operator_environment: OperatorEnvironmentOptions, } +/// Per-role configuration extracted during validation. +#[derive(Clone, Debug)] +pub struct ValidatedRoleConfig { + pub pdb: Option, + pub listener_class: Option, + pub group_listener_name: Option, +} + +/// Per-rolegroup configuration: the merged CRD config plus the product-config properties. +#[derive(Clone, Debug)] +pub struct ValidatedRoleGroupConfig { + pub merged_config: AirflowConfig, + pub product_config_properties: HashMap>, +} + +pub use crate::controller::dereference::DereferencedObjects; + +/// The validated cluster: proves that product-config validation and config merging +/// succeeded for every role and role group before any resources are created. +#[derive(Clone, Debug)] +pub struct ValidatedAirflowCluster { + pub image: ResolvedProductImage, + pub role_groups: BTreeMap>, + pub role_configs: BTreeMap, + pub executor: AirflowExecutor, +} + #[derive(Snafu, Debug, EnumDiscriminants)] #[strum_discriminants(derive(IntoStaticStr))] pub enum Error { @@ -234,8 +253,10 @@ pub enum Error { source: stackable_operator::client::Error, }, - #[snafu(display("failed to apply authentication configuration"))] - InvalidAuthenticationConfig { source: crd::authentication::Error }, + #[snafu(display("failed to dereference cluster resources"))] + Dereference { + source: crate::controller::dereference::Error, + }, #[snafu(display("pod template serialization"))] PodTemplateSerde { source: serde_yaml::Error }, @@ -320,21 +341,6 @@ pub enum Error { #[snafu(display("failed to configure service"))] ServiceConfiguration { source: crate::service::Error }, - - #[snafu(display("invalid authorization config"))] - InvalidAuthorizationConfig { - source: stackable_operator::commons::opa::Error, - }, - - #[snafu(display("failed to resolve product image"))] - ResolveProductImage { - source: product_image_selection::Error, - }, - - #[snafu(display("failed to create internal secret"))] - InvalidInternalSecret { - source: random_secret_creation::Error, - }, } type Result = std::result::Result; @@ -345,6 +351,92 @@ impl ReconcilerError for Error { } } +fn validate_cluster( + airflow: &v1alpha2::AirflowCluster, + dereferenced: &DereferencedObjects, + product_config_manager: &ProductConfigManager, +) -> Result { + let mut roles = HashMap::new(); + + for role in AirflowRole::iter() { + if let Some(resolved_role) = airflow.get_role(&role) { + roles.insert( + role.to_string(), + ( + vec![ + PropertyNameKind::Env, + PropertyNameKind::File(AIRFLOW_CONFIG_FILENAME.into()), + ], + resolved_role.clone(), + ), + ); + } + } + + let role_config = transform_all_roles_to_config(airflow, &roles); + let validated_role_config = validate_all_roles_and_groups_config( + &dereferenced.resolved_product_image.product_version, + &role_config.context(ProductConfigTransformSnafu)?, + product_config_manager, + false, + false, + ) + .context(InvalidProductConfigSnafu)?; + + let mut role_groups = BTreeMap::new(); + let mut role_configs = BTreeMap::new(); + + for (role_name, rolegroup_configs) in validated_role_config.iter() { + let airflow_role = + AirflowRole::from_str(role_name).context(UnidentifiedAirflowRoleSnafu { + role: role_name.to_string(), + })?; + + role_configs.insert( + airflow_role.clone(), + ValidatedRoleConfig { + pdb: airflow + .role_config(&airflow_role) + .map(|rc| rc.pod_disruption_budget), + listener_class: airflow_role + .listener_class_name(airflow) + .map(|s| s.to_string()), + group_listener_name: airflow.group_listener_name(&airflow_role), + }, + ); + + let mut group_configs = BTreeMap::new(); + for (rolegroup_name, rolegroup_config) in rolegroup_configs.iter() { + let rolegroup_ref = RoleGroupRef { + cluster: ObjectRef::from_obj(airflow), + role: role_name.into(), + role_group: rolegroup_name.into(), + }; + + let merged_config = airflow + .merged_config(&airflow_role, &rolegroup_ref) + .context(FailedToResolveConfigSnafu)?; + + group_configs.insert( + rolegroup_name.clone(), + ValidatedRoleGroupConfig { + merged_config, + product_config_properties: rolegroup_config.clone(), + }, + ); + } + + role_groups.insert(airflow_role, group_configs); + } + + Ok(ValidatedAirflowCluster { + image: dereferenced.resolved_product_image.clone(), + role_groups, + role_configs, + executor: airflow.spec.executor.clone(), + }) +} + pub async fn reconcile_airflow( airflow: Arc>, ctx: Arc, @@ -358,34 +450,19 @@ pub async fn reconcile_airflow( .context(InvalidAirflowClusterSnafu)?; let client = &ctx.client; - let resolved_product_image = airflow - .spec - .image - .resolve( - CONTAINER_IMAGE_BASE_NAME, - &ctx.operator_environment.image_repository, - crate::built_info::PKG_VERSION, - ) - .context(ResolveProductImageSnafu)?; - let cluster_operation_cond_builder = - ClusterOperationsConditionBuilder::new(&airflow.spec.cluster_operation); - - let authentication_config = AirflowClientAuthenticationDetailsResolved::from( - &airflow.spec.cluster_config.authentication, - client, - ) - .await - .context(InvalidAuthenticationConfigSnafu)?; - - let authorization_config = AirflowAuthorizationResolved::from_authorization_config( + let dereferenced = crate::controller::dereference::dereference( client, airflow, - &airflow.spec.cluster_config.authorization, + CONTAINER_IMAGE_BASE_NAME, + &ctx.operator_environment.image_repository, + crate::built_info::PKG_VERSION, ) .await - .context(InvalidAuthorizationConfigSnafu)?; - // We don't have a config file, but do everything via env substitution + .context(DereferenceSnafu)?; + + let cluster_operation_cond_builder = + ClusterOperationsConditionBuilder::new(&airflow.spec.cluster_operation); let templating_mechanism = TemplatingMechanism::BashEnvSubstitution; let metadata_database_connection_details = airflow @@ -424,34 +501,7 @@ pub async fn reconcile_airflow( None }; - let mut roles = HashMap::new(); - - // if the kubernetes executor is specified there will be no worker role as the pods - // are provisioned by airflow as defined by the task (default: one pod per task) - for role in AirflowRole::iter() { - if let Some(resolved_role) = airflow.get_role(&role) { - roles.insert( - role.to_string(), - ( - vec![ - PropertyNameKind::Env, - PropertyNameKind::File(AIRFLOW_CONFIG_FILENAME.into()), - ], - resolved_role.clone(), - ), - ); - } - } - - let role_config = transform_all_roles_to_config(airflow, &roles); - let validated_role_config = validate_all_roles_and_groups_config( - &resolved_product_image.product_version, - &role_config.context(ProductConfigTransformSnafu)?, - &ctx.product_config, - false, - false, - ) - .context(InvalidProductConfigSnafu)?; + let validated = validate_cluster(airflow, &dereferenced, &ctx.product_config)?; let mut cluster_resources = ClusterResources::new( APP_NAME, @@ -481,21 +531,19 @@ pub async fn reconcile_airflow( let mut ss_cond_builder = StatefulSetConditionBuilder::default(); - let airflow_executor = &airflow.spec.executor; - // if the kubernetes executor is specified, in place of a worker role that will be in the role // collection there will be a pod template created to be used for pod provisioning if let AirflowExecutor::KubernetesExecutors { common_configuration, - } = &airflow_executor + } = &validated.executor { build_executor_template( airflow, common_configuration, &metadata_database_connection_details, - &resolved_product_image, - &authentication_config, - &authorization_config, + &validated.image, + &dereferenced.authentication_config, + &dereferenced.authorization_config, &mut cluster_resources, client, &rbac_sa, @@ -503,94 +551,53 @@ pub async fn reconcile_airflow( .await?; } - random_secret_creation::create_random_secret_if_not_exists( - &airflow.shared_internal_secret_secret_name(), - INTERNAL_SECRET_SECRET_KEY, - 256, - airflow, - client, - ) - .await - .context(InvalidInternalSecretSnafu)?; + for (airflow_role, role_group_configs) in &validated.role_groups { + let role_name = airflow_role.to_string(); - random_secret_creation::create_random_secret_if_not_exists( - &airflow.shared_jwt_secret_secret_name(), - JWT_SECRET_SECRET_KEY, - 256, - airflow, - client, - ) - .await - .context(InvalidInternalSecretSnafu)?; - - random_secret_creation::create_random_secret_if_not_exists( - &airflow.shared_fernet_key_secret_name(), - FERNET_KEY_SECRET_KEY, - // https://airflow.apache.org/docs/apache-airflow/stable/security/secrets/fernet.html#security-fernet - // does not document how long the fernet key should be, but recommends using - // python -c "from cryptography.fernet import Fernet; print(Fernet.generate_key().decode())" - // which returns `jUm21LuA76YZmrIa9u4eXRg0h0P24MDC9IDOmDvJbfw=`, which has 44 characters, which makes 32 bytes. - 32, - airflow, - client, - ) - .await - .context(InvalidInternalSecretSnafu)?; - - for (role_name, role_config) in validated_role_config.iter() { - let airflow_role = - AirflowRole::from_str(role_name).context(UnidentifiedAirflowRoleSnafu { - role: role_name.to_string(), - })?; - - if let Some(GenericRoleConfig { - pod_disruption_budget: pdb, - }) = airflow.role_config(&airflow_role) - { - add_pdbs(&pdb, airflow, &airflow_role, client, &mut cluster_resources) - .await - .context(FailedToCreatePdbSnafu)?; - } + if let Some(role_config) = validated.role_configs.get(airflow_role) { + if let Some(pdb) = &role_config.pdb { + add_pdbs(pdb, airflow, airflow_role, client, &mut cluster_resources) + .await + .context(FailedToCreatePdbSnafu)?; + } - if let Some(listener_class) = airflow_role.listener_class_name(airflow) { - if let Some(listener_group_name) = airflow.group_listener_name(&airflow_role) { - let rg_group_listener = build_group_listener( - airflow, - build_recommended_labels( + if let Some(listener_class) = &role_config.listener_class { + if let Some(listener_group_name) = &role_config.group_listener_name { + let rg_group_listener = build_group_listener( airflow, - AIRFLOW_CONTROLLER_NAME, - &resolved_product_image.app_version_label_value, - role_name, - "none", - ), - listener_class.to_string(), - listener_group_name, - )?; - cluster_resources - .add(client, rg_group_listener) - .await - .context(ApplyGroupListenerSnafu)?; + build_recommended_labels( + airflow, + AIRFLOW_CONTROLLER_NAME, + &validated.image.app_version_label_value, + &role_name, + "none", + ), + listener_class.to_string(), + listener_group_name.clone(), + )?; + cluster_resources + .add(client, rg_group_listener) + .await + .context(ApplyGroupListenerSnafu)?; + } } } - for (rolegroup_name, rolegroup_config) in role_config.iter() { + for (rolegroup_name, validated_rg_config) in role_group_configs { let rolegroup = RoleGroupRef { cluster: ObjectRef::from_obj(airflow), - role: role_name.into(), + role: role_name.clone(), role_group: rolegroup_name.into(), }; - let merged_airflow_config = airflow - .merged_config(&airflow_role, &rolegroup) - .context(FailedToResolveConfigSnafu)?; - let git_sync_resources = git_sync::v1alpha2::GitSyncResources::new( &airflow.spec.cluster_config.dags_git_sync, - &resolved_product_image, - &env_vars_from_rolegroup_config(rolegroup_config), + &validated.image, + &env_vars_from_rolegroup_config(&validated_rg_config.product_config_properties), &airflow.volume_mounts(), LOG_VOLUME_NAME, - &merged_airflow_config + &validated_rg_config + .merged_config .logging .for_container(&Container::GitSync), ) @@ -599,7 +606,7 @@ pub async fn reconcile_airflow( let role_group_service_recommended_labels = build_recommended_labels( airflow, AIRFLOW_CONTROLLER_NAME, - &resolved_product_image.app_version_label_value, + &validated.image.app_version_label_value, &rolegroup.role, &rolegroup.role_group, ); @@ -643,12 +650,12 @@ pub async fn reconcile_airflow( let rg_configmap = build_rolegroup_config_map( airflow, - &resolved_product_image, + &validated.image, &rolegroup, - rolegroup_config, - &authentication_config, - &authorization_config, - &merged_airflow_config.logging, + &validated_rg_config.product_config_properties, + &dereferenced.authentication_config, + &dereferenced.authorization_config, + &validated_rg_config.merged_config.logging, &Container::Airflow, )?; cluster_resources @@ -658,22 +665,19 @@ pub async fn reconcile_airflow( rolegroup: rolegroup.clone(), })?; - // Note: The StatefulSet needs to be applied after all ConfigMaps and Secrets it mounts - // to prevent unnecessary Pod restarts. - // See https://github.com/stackabletech/commons-operator/issues/111 for details. let rg_statefulset = build_server_rolegroup_statefulset( airflow, - &resolved_product_image, - &airflow_role, + &validated.image, + airflow_role, &rolegroup, - rolegroup_config, - &authentication_config, - &authorization_config, + &validated_rg_config.product_config_properties, + &dereferenced.authentication_config, + &dereferenced.authorization_config, &metadata_database_connection_details, &celery_database_connection_details, &rbac_sa, - &merged_airflow_config, - airflow_executor, + &validated_rg_config.merged_config, + &validated.executor, &git_sync_resources, )?; diff --git a/rust/operator-binary/src/controller/dereference.rs b/rust/operator-binary/src/controller/dereference.rs new file mode 100644 index 00000000..da62f509 --- /dev/null +++ b/rust/operator-binary/src/controller/dereference.rs @@ -0,0 +1,107 @@ +use snafu::{ResultExt, Snafu}; +use stackable_operator::commons::{ + product_image_selection::{self, ResolvedProductImage}, + random_secret_creation, +}; + +use crate::crd::{ + authentication::AirflowClientAuthenticationDetailsResolved, + authorization::AirflowAuthorizationResolved, + internal_secret::{FERNET_KEY_SECRET_KEY, INTERNAL_SECRET_SECRET_KEY, JWT_SECRET_SECRET_KEY}, + v1alpha2, +}; + +#[derive(Snafu, Debug)] +pub enum Error { + #[snafu(display("failed to resolve product image"))] + ResolveProductImage { + source: product_image_selection::Error, + }, + + #[snafu(display("failed to apply authentication configuration"))] + InvalidAuthenticationConfig { + source: crate::crd::authentication::Error, + }, + + #[snafu(display("invalid authorization config"))] + InvalidAuthorizationConfig { + source: stackable_operator::commons::opa::Error, + }, + + #[snafu(display("failed to create internal secret"))] + InvalidInternalSecret { + source: random_secret_creation::Error, + }, +} + +/// External references resolved during the dereference step. +pub struct DereferencedObjects { + pub resolved_product_image: ResolvedProductImage, + pub authentication_config: AirflowClientAuthenticationDetailsResolved, + pub authorization_config: AirflowAuthorizationResolved, +} + +pub async fn dereference( + client: &stackable_operator::client::Client, + airflow: &v1alpha2::AirflowCluster, + image_base_name: &str, + image_repository: &str, + pkg_version: &str, +) -> Result { + let resolved_product_image = airflow + .spec + .image + .resolve(image_base_name, image_repository, pkg_version) + .context(ResolveProductImageSnafu)?; + + let authentication_config = AirflowClientAuthenticationDetailsResolved::from( + &airflow.spec.cluster_config.authentication, + client, + ) + .await + .context(InvalidAuthenticationConfigSnafu)?; + + let authorization_config = AirflowAuthorizationResolved::from_authorization_config( + client, + airflow, + &airflow.spec.cluster_config.authorization, + ) + .await + .context(InvalidAuthorizationConfigSnafu)?; + + random_secret_creation::create_random_secret_if_not_exists( + &airflow.shared_internal_secret_secret_name(), + INTERNAL_SECRET_SECRET_KEY, + 256, + airflow, + client, + ) + .await + .context(InvalidInternalSecretSnafu)?; + + random_secret_creation::create_random_secret_if_not_exists( + &airflow.shared_jwt_secret_secret_name(), + JWT_SECRET_SECRET_KEY, + 256, + airflow, + client, + ) + .await + .context(InvalidInternalSecretSnafu)?; + + random_secret_creation::create_random_secret_if_not_exists( + &airflow.shared_fernet_key_secret_name(), + FERNET_KEY_SECRET_KEY, + 32, + airflow, + client, + ) + .await + .context(InvalidInternalSecretSnafu)?; + + Ok(DereferencedObjects { + resolved_product_image, + authentication_config, + authorization_config, + }) +} diff --git a/rust/operator-binary/src/controller/mod.rs b/rust/operator-binary/src/controller/mod.rs new file mode 100644 index 00000000..d806dab0 --- /dev/null +++ b/rust/operator-binary/src/controller/mod.rs @@ -0,0 +1 @@ +pub mod dereference; diff --git a/rust/operator-binary/src/main.rs b/rust/operator-binary/src/main.rs index 6cc89e99..dee5603b 100644 --- a/rust/operator-binary/src/main.rs +++ b/rust/operator-binary/src/main.rs @@ -40,6 +40,7 @@ use crate::{ mod airflow_controller; mod config; +mod controller; mod controller_commons; mod crd; mod env_vars; From 0b2f83c06d06fbc67f36de7bb8813cb38d9912c9 Mon Sep 17 00:00:00 2001 From: Andrew Kenworthy Date: Tue, 12 May 2026 15:23:00 +0200 Subject: [PATCH 3/9] fix: restore fernet key length comment in dereference module Co-Authored-By: Claude Opus 4.6 --- rust/operator-binary/src/controller/dereference.rs | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/rust/operator-binary/src/controller/dereference.rs b/rust/operator-binary/src/controller/dereference.rs index da62f509..24303c64 100644 --- a/rust/operator-binary/src/controller/dereference.rs +++ b/rust/operator-binary/src/controller/dereference.rs @@ -89,6 +89,10 @@ pub async fn dereference( .await .context(InvalidInternalSecretSnafu)?; + // https://airflow.apache.org/docs/apache-airflow/stable/security/secrets/fernet.html#security-fernet + // does not document how long the fernet key should be, but recommends using + // python -c "from cryptography.fernet import Fernet; print(Fernet.generate_key().decode())" + // which returns `jUm21LuA76YZmrIa9u4eXRg0h0P24MDC9IDOmDvJbfw=`, which has 44 characters, which makes 32 bytes. random_secret_creation::create_random_secret_if_not_exists( &airflow.shared_fernet_key_secret_name(), FERNET_KEY_SECRET_KEY, From 6fe6753769657d2cc5ed765a5fe90fe9f10ac663 Mon Sep 17 00:00:00 2001 From: Andrew Kenworthy Date: Tue, 12 May 2026 15:49:09 +0200 Subject: [PATCH 4/9] fix: restore kubernetes executor comment in validate_cluster Co-Authored-By: Claude Opus 4.6 --- rust/operator-binary/src/airflow_controller.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/rust/operator-binary/src/airflow_controller.rs b/rust/operator-binary/src/airflow_controller.rs index 5a266840..a1f226d2 100644 --- a/rust/operator-binary/src/airflow_controller.rs +++ b/rust/operator-binary/src/airflow_controller.rs @@ -358,6 +358,8 @@ fn validate_cluster( ) -> Result { let mut roles = HashMap::new(); + // if the kubernetes executor is specified there will be no worker role as the pods + // are provisioned by airflow as defined by the task (default: one pod per task) for role in AirflowRole::iter() { if let Some(resolved_role) = airflow.get_role(&role) { roles.insert( From b052a9b7c20d2bc08180141c2f7a16bb4500e951 Mon Sep 17 00:00:00 2001 From: Andrew Kenworthy Date: Tue, 12 May 2026 17:22:37 +0200 Subject: [PATCH 5/9] refactor: extract validate_cluster into controller::validate module Co-Authored-By: Claude Opus 4.6 --- .../operator-binary/src/airflow_controller.rs | 146 ++--------------- rust/operator-binary/src/controller/mod.rs | 1 + .../src/controller/validate.rs | 151 ++++++++++++++++++ 3 files changed, 162 insertions(+), 136 deletions(-) create mode 100644 rust/operator-binary/src/controller/validate.rs diff --git a/rust/operator-binary/src/airflow_controller.rs b/rust/operator-binary/src/airflow_controller.rs index a1f226d2..a90bcc0a 100644 --- a/rust/operator-binary/src/airflow_controller.rs +++ b/rust/operator-binary/src/airflow_controller.rs @@ -2,7 +2,6 @@ use std::{ collections::{BTreeMap, BTreeSet, HashMap}, io::Write, - str::FromStr, sync::Arc, }; @@ -61,8 +60,7 @@ use stackable_operator::{ logging::controller::ReconcilerError, product_config_utils::{ CONFIG_OVERRIDE_FILE_FOOTER_KEY, CONFIG_OVERRIDE_FILE_HEADER_KEY, env_vars_from, - env_vars_from_rolegroup_config, transform_all_roles_to_config, - validate_all_roles_and_groups_config, + env_vars_from_rolegroup_config, }, product_logging::{ self, @@ -77,7 +75,7 @@ use stackable_operator::{ }, utils::COMMON_BASH_TRAP_FUNCTIONS, }; -use strum::{EnumDiscriminants, IntoEnumIterator, IntoStaticStr}; +use strum::{EnumDiscriminants, IntoStaticStr}; use crate::{ config::{self, PYTHON_IMPORTS}, @@ -119,33 +117,6 @@ pub struct Ctx { pub operator_environment: OperatorEnvironmentOptions, } -/// Per-role configuration extracted during validation. -#[derive(Clone, Debug)] -pub struct ValidatedRoleConfig { - pub pdb: Option, - pub listener_class: Option, - pub group_listener_name: Option, -} - -/// Per-rolegroup configuration: the merged CRD config plus the product-config properties. -#[derive(Clone, Debug)] -pub struct ValidatedRoleGroupConfig { - pub merged_config: AirflowConfig, - pub product_config_properties: HashMap>, -} - -pub use crate::controller::dereference::DereferencedObjects; - -/// The validated cluster: proves that product-config validation and config merging -/// succeeded for every role and role group before any resources are created. -#[derive(Clone, Debug)] -pub struct ValidatedAirflowCluster { - pub image: ResolvedProductImage, - pub role_groups: BTreeMap>, - pub role_configs: BTreeMap, - pub executor: AirflowExecutor, -} - #[derive(Snafu, Debug, EnumDiscriminants)] #[strum_discriminants(derive(IntoStaticStr))] pub enum Error { @@ -170,21 +141,11 @@ pub enum Error { rolegroup: RoleGroupRef, }, - #[snafu(display("invalid product config"))] - InvalidProductConfig { - source: stackable_operator::product_config_utils::Error, - }, - #[snafu(display("object is missing metadata to build owner reference"))] ObjectMissingMetadataForOwnerRef { source: stackable_operator::builder::meta::Error, }, - #[snafu(display("Failed to transform configs"))] - ProductConfigTransform { - source: stackable_operator::product_config_utils::Error, - }, - #[snafu(display("failed to patch service account"))] ApplyServiceAccount { source: stackable_operator::cluster_resources::Error, @@ -215,12 +176,6 @@ pub enum Error { #[snafu(display("failed to resolve and merge config for role and role group"))] FailedToResolveConfig { source: crd::Error }, - #[snafu(display("could not parse Airflow role [{role}]"))] - UnidentifiedAirflowRole { - source: strum::ParseError, - role: String, - }, - #[snafu(display("invalid container name"))] InvalidContainerName { source: stackable_operator::builder::pod::container::Error, @@ -258,6 +213,11 @@ pub enum Error { source: crate::controller::dereference::Error, }, + #[snafu(display("failed to validate cluster configuration"))] + Validate { + source: crate::controller::validate::Error, + }, + #[snafu(display("pod template serialization"))] PodTemplateSerde { source: serde_yaml::Error }, @@ -351,94 +311,6 @@ impl ReconcilerError for Error { } } -fn validate_cluster( - airflow: &v1alpha2::AirflowCluster, - dereferenced: &DereferencedObjects, - product_config_manager: &ProductConfigManager, -) -> Result { - let mut roles = HashMap::new(); - - // if the kubernetes executor is specified there will be no worker role as the pods - // are provisioned by airflow as defined by the task (default: one pod per task) - for role in AirflowRole::iter() { - if let Some(resolved_role) = airflow.get_role(&role) { - roles.insert( - role.to_string(), - ( - vec![ - PropertyNameKind::Env, - PropertyNameKind::File(AIRFLOW_CONFIG_FILENAME.into()), - ], - resolved_role.clone(), - ), - ); - } - } - - let role_config = transform_all_roles_to_config(airflow, &roles); - let validated_role_config = validate_all_roles_and_groups_config( - &dereferenced.resolved_product_image.product_version, - &role_config.context(ProductConfigTransformSnafu)?, - product_config_manager, - false, - false, - ) - .context(InvalidProductConfigSnafu)?; - - let mut role_groups = BTreeMap::new(); - let mut role_configs = BTreeMap::new(); - - for (role_name, rolegroup_configs) in validated_role_config.iter() { - let airflow_role = - AirflowRole::from_str(role_name).context(UnidentifiedAirflowRoleSnafu { - role: role_name.to_string(), - })?; - - role_configs.insert( - airflow_role.clone(), - ValidatedRoleConfig { - pdb: airflow - .role_config(&airflow_role) - .map(|rc| rc.pod_disruption_budget), - listener_class: airflow_role - .listener_class_name(airflow) - .map(|s| s.to_string()), - group_listener_name: airflow.group_listener_name(&airflow_role), - }, - ); - - let mut group_configs = BTreeMap::new(); - for (rolegroup_name, rolegroup_config) in rolegroup_configs.iter() { - let rolegroup_ref = RoleGroupRef { - cluster: ObjectRef::from_obj(airflow), - role: role_name.into(), - role_group: rolegroup_name.into(), - }; - - let merged_config = airflow - .merged_config(&airflow_role, &rolegroup_ref) - .context(FailedToResolveConfigSnafu)?; - - group_configs.insert( - rolegroup_name.clone(), - ValidatedRoleGroupConfig { - merged_config, - product_config_properties: rolegroup_config.clone(), - }, - ); - } - - role_groups.insert(airflow_role, group_configs); - } - - Ok(ValidatedAirflowCluster { - image: dereferenced.resolved_product_image.clone(), - role_groups, - role_configs, - executor: airflow.spec.executor.clone(), - }) -} - pub async fn reconcile_airflow( airflow: Arc>, ctx: Arc, @@ -503,7 +375,9 @@ pub async fn reconcile_airflow( None }; - let validated = validate_cluster(airflow, &dereferenced, &ctx.product_config)?; + let validated = + crate::controller::validate::validate_cluster(airflow, &dereferenced, &ctx.product_config) + .context(ValidateSnafu)?; let mut cluster_resources = ClusterResources::new( APP_NAME, diff --git a/rust/operator-binary/src/controller/mod.rs b/rust/operator-binary/src/controller/mod.rs index d806dab0..1b261dfe 100644 --- a/rust/operator-binary/src/controller/mod.rs +++ b/rust/operator-binary/src/controller/mod.rs @@ -1 +1,2 @@ pub mod dereference; +pub mod validate; diff --git a/rust/operator-binary/src/controller/validate.rs b/rust/operator-binary/src/controller/validate.rs new file mode 100644 index 00000000..12fbc2bb --- /dev/null +++ b/rust/operator-binary/src/controller/validate.rs @@ -0,0 +1,151 @@ +use std::{ + collections::{BTreeMap, HashMap}, + str::FromStr, +}; + +use product_config::{ProductConfigManager, types::PropertyNameKind}; +use snafu::{ResultExt, Snafu}; +use stackable_operator::{ + commons::product_image_selection::ResolvedProductImage, + product_config_utils::{transform_all_roles_to_config, validate_all_roles_and_groups_config}, + role_utils::RoleGroupRef, +}; +use strum::IntoEnumIterator; + +use super::dereference::DereferencedObjects; +use crate::crd::{AIRFLOW_CONFIG_FILENAME, AirflowConfig, AirflowExecutor, AirflowRole, v1alpha2}; + +#[derive(Snafu, Debug)] +pub enum Error { + #[snafu(display("invalid product config"))] + InvalidProductConfig { + source: stackable_operator::product_config_utils::Error, + }, + + #[snafu(display("Failed to transform configs"))] + ProductConfigTransform { + source: stackable_operator::product_config_utils::Error, + }, + + #[snafu(display("failed to resolve and merge config for role and role group"))] + FailedToResolveConfig { source: crate::crd::Error }, + + #[snafu(display("could not parse Airflow role [{role}]"))] + UnidentifiedAirflowRole { + source: strum::ParseError, + role: String, + }, +} + +/// Per-role configuration extracted during validation. +#[derive(Clone, Debug)] +pub struct ValidatedRoleConfig { + pub pdb: Option, + pub listener_class: Option, + pub group_listener_name: Option, +} + +/// Per-rolegroup configuration: the merged CRD config plus the product-config properties. +#[derive(Clone, Debug)] +pub struct ValidatedRoleGroupConfig { + pub merged_config: AirflowConfig, + pub product_config_properties: HashMap>, +} + +/// The validated cluster: proves that product-config validation and config merging +/// succeeded for every role and role group before any resources are created. +#[derive(Clone, Debug)] +pub struct ValidatedAirflowCluster { + pub image: ResolvedProductImage, + pub role_groups: BTreeMap>, + pub role_configs: BTreeMap, + pub executor: AirflowExecutor, +} + +pub fn validate_cluster( + airflow: &v1alpha2::AirflowCluster, + dereferenced: &DereferencedObjects, + product_config_manager: &ProductConfigManager, +) -> Result { + let mut roles = HashMap::new(); + + // if the kubernetes executor is specified there will be no worker role as the pods + // are provisioned by airflow as defined by the task (default: one pod per task) + for role in AirflowRole::iter() { + if let Some(resolved_role) = airflow.get_role(&role) { + roles.insert( + role.to_string(), + ( + vec![ + PropertyNameKind::Env, + PropertyNameKind::File(AIRFLOW_CONFIG_FILENAME.into()), + ], + resolved_role.clone(), + ), + ); + } + } + + let role_config = transform_all_roles_to_config(airflow, &roles); + let validated_role_config = validate_all_roles_and_groups_config( + &dereferenced.resolved_product_image.product_version, + &role_config.context(ProductConfigTransformSnafu)?, + product_config_manager, + false, + false, + ) + .context(InvalidProductConfigSnafu)?; + + let mut role_groups = BTreeMap::new(); + let mut role_configs = BTreeMap::new(); + + for (role_name, rolegroup_configs) in validated_role_config.iter() { + let airflow_role = + AirflowRole::from_str(role_name).context(UnidentifiedAirflowRoleSnafu { + role: role_name.to_string(), + })?; + + role_configs.insert( + airflow_role.clone(), + ValidatedRoleConfig { + pdb: airflow + .role_config(&airflow_role) + .map(|rc| rc.pod_disruption_budget), + listener_class: airflow_role + .listener_class_name(airflow) + .map(|s| s.to_string()), + group_listener_name: airflow.group_listener_name(&airflow_role), + }, + ); + + let mut group_configs = BTreeMap::new(); + for (rolegroup_name, rolegroup_config) in rolegroup_configs.iter() { + let rolegroup_ref = RoleGroupRef { + cluster: stackable_operator::kube::runtime::reflector::ObjectRef::from_obj(airflow), + role: role_name.into(), + role_group: rolegroup_name.into(), + }; + + let merged_config = airflow + .merged_config(&airflow_role, &rolegroup_ref) + .context(FailedToResolveConfigSnafu)?; + + group_configs.insert( + rolegroup_name.clone(), + ValidatedRoleGroupConfig { + merged_config, + product_config_properties: rolegroup_config.clone(), + }, + ); + } + + role_groups.insert(airflow_role, group_configs); + } + + Ok(ValidatedAirflowCluster { + image: dereferenced.resolved_product_image.clone(), + role_groups, + role_configs, + executor: airflow.spec.executor.clone(), + }) +} From a958ce27d1aca2953fd53c723a82ddbe19098663 Mon Sep 17 00:00:00 2001 From: Andrew Kenworthy Date: Wed, 13 May 2026 11:41:13 +0200 Subject: [PATCH 6/9] refactor: align error variant naming with hive and hbase operators Rename ProductConfigTransform to GenerateProductConfig and fix the display string casing to match the convention used across all three dereference/validate extraction PRs. Co-Authored-By: Claude Opus 4.6 --- rust/operator-binary/src/controller/validate.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/rust/operator-binary/src/controller/validate.rs b/rust/operator-binary/src/controller/validate.rs index 12fbc2bb..da3ad2e3 100644 --- a/rust/operator-binary/src/controller/validate.rs +++ b/rust/operator-binary/src/controller/validate.rs @@ -22,8 +22,8 @@ pub enum Error { source: stackable_operator::product_config_utils::Error, }, - #[snafu(display("Failed to transform configs"))] - ProductConfigTransform { + #[snafu(display("failed to generate product config"))] + GenerateProductConfig { source: stackable_operator::product_config_utils::Error, }, @@ -89,7 +89,7 @@ pub fn validate_cluster( let role_config = transform_all_roles_to_config(airflow, &roles); let validated_role_config = validate_all_roles_and_groups_config( &dereferenced.resolved_product_image.product_version, - &role_config.context(ProductConfigTransformSnafu)?, + &role_config.context(GenerateProductConfigSnafu)?, product_config_manager, false, false, From a68b232fef82723fece32a1d2fcc18e5461b6ff1 Mon Sep 17 00:00:00 2001 From: Andrew Kenworthy Date: Wed, 13 May 2026 16:47:06 +0200 Subject: [PATCH 7/9] refactor: move product image resolution from dereference to validate MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Image resolution is a pure computation, not an I/O dereference, so it belongs in validate_cluster alongside the other config validation. This also aligns with the pattern used by the trino operator. The dereference error variants were renamed to drop the `Invalid` prefix (e.g. InvalidAuthenticationConfig → AuthenticationConfig) because removing ResolveProductImage left all remaining variants sharing the same prefix, triggering clippy::enum_variant_names. Co-Authored-By: Claude Opus 4.6 --- .../operator-binary/src/airflow_controller.rs | 23 ++++++------ .../src/controller/dereference.rs | 37 +++++-------------- .../src/controller/validate.rs | 22 ++++++++--- 3 files changed, 37 insertions(+), 45 deletions(-) diff --git a/rust/operator-binary/src/airflow_controller.rs b/rust/operator-binary/src/airflow_controller.rs index a90bcc0a..6581af35 100644 --- a/rust/operator-binary/src/airflow_controller.rs +++ b/rust/operator-binary/src/airflow_controller.rs @@ -325,15 +325,9 @@ pub async fn reconcile_airflow( let client = &ctx.client; - let dereferenced = crate::controller::dereference::dereference( - client, - airflow, - CONTAINER_IMAGE_BASE_NAME, - &ctx.operator_environment.image_repository, - crate::built_info::PKG_VERSION, - ) - .await - .context(DereferenceSnafu)?; + let dereferenced = crate::controller::dereference::dereference(client, airflow) + .await + .context(DereferenceSnafu)?; let cluster_operation_cond_builder = ClusterOperationsConditionBuilder::new(&airflow.spec.cluster_operation); @@ -375,9 +369,14 @@ pub async fn reconcile_airflow( None }; - let validated = - crate::controller::validate::validate_cluster(airflow, &dereferenced, &ctx.product_config) - .context(ValidateSnafu)?; + let validated = crate::controller::validate::validate_cluster( + airflow, + CONTAINER_IMAGE_BASE_NAME, + &ctx.operator_environment.image_repository, + crate::built_info::PKG_VERSION, + &ctx.product_config, + ) + .context(ValidateSnafu)?; let mut cluster_resources = ClusterResources::new( APP_NAME, diff --git a/rust/operator-binary/src/controller/dereference.rs b/rust/operator-binary/src/controller/dereference.rs index 24303c64..28755d43 100644 --- a/rust/operator-binary/src/controller/dereference.rs +++ b/rust/operator-binary/src/controller/dereference.rs @@ -1,8 +1,5 @@ use snafu::{ResultExt, Snafu}; -use stackable_operator::commons::{ - product_image_selection::{self, ResolvedProductImage}, - random_secret_creation, -}; +use stackable_operator::commons::random_secret_creation; use crate::crd::{ authentication::AirflowClientAuthenticationDetailsResolved, @@ -13,30 +10,24 @@ use crate::crd::{ #[derive(Snafu, Debug)] pub enum Error { - #[snafu(display("failed to resolve product image"))] - ResolveProductImage { - source: product_image_selection::Error, - }, - #[snafu(display("failed to apply authentication configuration"))] - InvalidAuthenticationConfig { + AuthenticationConfig { source: crate::crd::authentication::Error, }, #[snafu(display("invalid authorization config"))] - InvalidAuthorizationConfig { + AuthorizationConfig { source: stackable_operator::commons::opa::Error, }, #[snafu(display("failed to create internal secret"))] - InvalidInternalSecret { + InternalSecret { source: random_secret_creation::Error, }, } /// External references resolved during the dereference step. pub struct DereferencedObjects { - pub resolved_product_image: ResolvedProductImage, pub authentication_config: AirflowClientAuthenticationDetailsResolved, pub authorization_config: AirflowAuthorizationResolved, } @@ -44,22 +35,13 @@ pub struct DereferencedObjects { pub async fn dereference( client: &stackable_operator::client::Client, airflow: &v1alpha2::AirflowCluster, - image_base_name: &str, - image_repository: &str, - pkg_version: &str, ) -> Result { - let resolved_product_image = airflow - .spec - .image - .resolve(image_base_name, image_repository, pkg_version) - .context(ResolveProductImageSnafu)?; - let authentication_config = AirflowClientAuthenticationDetailsResolved::from( &airflow.spec.cluster_config.authentication, client, ) .await - .context(InvalidAuthenticationConfigSnafu)?; + .context(AuthenticationConfigSnafu)?; let authorization_config = AirflowAuthorizationResolved::from_authorization_config( client, @@ -67,7 +49,7 @@ pub async fn dereference( &airflow.spec.cluster_config.authorization, ) .await - .context(InvalidAuthorizationConfigSnafu)?; + .context(AuthorizationConfigSnafu)?; random_secret_creation::create_random_secret_if_not_exists( &airflow.shared_internal_secret_secret_name(), @@ -77,7 +59,7 @@ pub async fn dereference( client, ) .await - .context(InvalidInternalSecretSnafu)?; + .context(InternalSecretSnafu)?; random_secret_creation::create_random_secret_if_not_exists( &airflow.shared_jwt_secret_secret_name(), @@ -87,7 +69,7 @@ pub async fn dereference( client, ) .await - .context(InvalidInternalSecretSnafu)?; + .context(InternalSecretSnafu)?; // https://airflow.apache.org/docs/apache-airflow/stable/security/secrets/fernet.html#security-fernet // does not document how long the fernet key should be, but recommends using @@ -101,10 +83,9 @@ pub async fn dereference( client, ) .await - .context(InvalidInternalSecretSnafu)?; + .context(InternalSecretSnafu)?; Ok(DereferencedObjects { - resolved_product_image, authentication_config, authorization_config, }) diff --git a/rust/operator-binary/src/controller/validate.rs b/rust/operator-binary/src/controller/validate.rs index da3ad2e3..6eb2a16c 100644 --- a/rust/operator-binary/src/controller/validate.rs +++ b/rust/operator-binary/src/controller/validate.rs @@ -6,17 +6,21 @@ use std::{ use product_config::{ProductConfigManager, types::PropertyNameKind}; use snafu::{ResultExt, Snafu}; use stackable_operator::{ - commons::product_image_selection::ResolvedProductImage, + commons::product_image_selection::{self, ResolvedProductImage}, product_config_utils::{transform_all_roles_to_config, validate_all_roles_and_groups_config}, role_utils::RoleGroupRef, }; use strum::IntoEnumIterator; -use super::dereference::DereferencedObjects; use crate::crd::{AIRFLOW_CONFIG_FILENAME, AirflowConfig, AirflowExecutor, AirflowRole, v1alpha2}; #[derive(Snafu, Debug)] pub enum Error { + #[snafu(display("failed to resolve product image"))] + ResolveProductImage { + source: product_image_selection::Error, + }, + #[snafu(display("invalid product config"))] InvalidProductConfig { source: stackable_operator::product_config_utils::Error, @@ -64,9 +68,17 @@ pub struct ValidatedAirflowCluster { pub fn validate_cluster( airflow: &v1alpha2::AirflowCluster, - dereferenced: &DereferencedObjects, + image_base_name: &str, + image_repository: &str, + pkg_version: &str, product_config_manager: &ProductConfigManager, ) -> Result { + let resolved_product_image = airflow + .spec + .image + .resolve(image_base_name, image_repository, pkg_version) + .context(ResolveProductImageSnafu)?; + let mut roles = HashMap::new(); // if the kubernetes executor is specified there will be no worker role as the pods @@ -88,7 +100,7 @@ pub fn validate_cluster( let role_config = transform_all_roles_to_config(airflow, &roles); let validated_role_config = validate_all_roles_and_groups_config( - &dereferenced.resolved_product_image.product_version, + &resolved_product_image.product_version, &role_config.context(GenerateProductConfigSnafu)?, product_config_manager, false, @@ -143,7 +155,7 @@ pub fn validate_cluster( } Ok(ValidatedAirflowCluster { - image: dereferenced.resolved_product_image.clone(), + image: resolved_product_image, role_groups, role_configs, executor: airflow.spec.executor.clone(), From d510d11694ddbaae394b4db7657adec4e2dfcc5e Mon Sep 17 00:00:00 2001 From: Andrew Kenworthy Date: Wed, 13 May 2026 17:15:27 +0200 Subject: [PATCH 8/9] refactor: move secret creation from dereference to reconciler Secret creation is a side effect that should not happen before the cluster spec has been validated. Moved it to the reconciler, after validate_cluster succeeds. Co-Authored-By: Claude Opus 4.6 --- .../operator-binary/src/airflow_controller.rs | 51 ++++++++++++++++++- .../src/controller/dereference.rs | 44 +--------------- 2 files changed, 50 insertions(+), 45 deletions(-) diff --git a/rust/operator-binary/src/airflow_controller.rs b/rust/operator-binary/src/airflow_controller.rs index 6581af35..ea7dd9ad 100644 --- a/rust/operator-binary/src/airflow_controller.rs +++ b/rust/operator-binary/src/airflow_controller.rs @@ -30,7 +30,10 @@ use stackable_operator::{ }, cli::OperatorEnvironmentOptions, cluster_resources::{ClusterResourceApplyStrategy, ClusterResources}, - commons::{product_image_selection::ResolvedProductImage, rbac::build_rbac_resources}, + commons::{ + product_image_selection::ResolvedProductImage, random_secret_creation, + rbac::build_rbac_resources, + }, crd::{authentication::ldap, git_sync, listener}, database_connections::{ TemplatingMechanism, @@ -90,7 +93,11 @@ use crate::{ AirflowAuthenticationClassResolved, AirflowClientAuthenticationDetailsResolved, }, authorization::AirflowAuthorizationResolved, - build_recommended_labels, v1alpha2, + build_recommended_labels, + internal_secret::{ + FERNET_KEY_SECRET_KEY, INTERNAL_SECRET_SECRET_KEY, JWT_SECRET_SECRET_KEY, + }, + v1alpha2, }, env_vars::{self, build_airflow_template_envs}, operations::{ @@ -208,6 +215,11 @@ pub enum Error { source: stackable_operator::client::Error, }, + #[snafu(display("failed to create internal secret"))] + InternalSecret { + source: random_secret_creation::Error, + }, + #[snafu(display("failed to dereference cluster resources"))] Dereference { source: crate::controller::dereference::Error, @@ -378,6 +390,41 @@ pub async fn reconcile_airflow( ) .context(ValidateSnafu)?; + // TODO: Move secret creation to a dedicated apply step once it exists. + random_secret_creation::create_random_secret_if_not_exists( + &airflow.shared_internal_secret_secret_name(), + INTERNAL_SECRET_SECRET_KEY, + 256, + airflow, + client, + ) + .await + .context(InternalSecretSnafu)?; + + random_secret_creation::create_random_secret_if_not_exists( + &airflow.shared_jwt_secret_secret_name(), + JWT_SECRET_SECRET_KEY, + 256, + airflow, + client, + ) + .await + .context(InternalSecretSnafu)?; + + // https://airflow.apache.org/docs/apache-airflow/stable/security/secrets/fernet.html#security-fernet + // does not document how long the fernet key should be, but recommends using + // python -c "from cryptography.fernet import Fernet; print(Fernet.generate_key().decode())" + // which returns `jUm21LuA76YZmrIa9u4eXRg0h0P24MDC9IDOmDvJbfw=`, which has 44 characters, which makes 32 bytes. + random_secret_creation::create_random_secret_if_not_exists( + &airflow.shared_fernet_key_secret_name(), + FERNET_KEY_SECRET_KEY, + 32, + airflow, + client, + ) + .await + .context(InternalSecretSnafu)?; + let mut cluster_resources = ClusterResources::new( APP_NAME, OPERATOR_NAME, diff --git a/rust/operator-binary/src/controller/dereference.rs b/rust/operator-binary/src/controller/dereference.rs index 28755d43..64e38d70 100644 --- a/rust/operator-binary/src/controller/dereference.rs +++ b/rust/operator-binary/src/controller/dereference.rs @@ -1,11 +1,8 @@ use snafu::{ResultExt, Snafu}; -use stackable_operator::commons::random_secret_creation; use crate::crd::{ authentication::AirflowClientAuthenticationDetailsResolved, - authorization::AirflowAuthorizationResolved, - internal_secret::{FERNET_KEY_SECRET_KEY, INTERNAL_SECRET_SECRET_KEY, JWT_SECRET_SECRET_KEY}, - v1alpha2, + authorization::AirflowAuthorizationResolved, v1alpha2, }; #[derive(Snafu, Debug)] @@ -19,11 +16,6 @@ pub enum Error { AuthorizationConfig { source: stackable_operator::commons::opa::Error, }, - - #[snafu(display("failed to create internal secret"))] - InternalSecret { - source: random_secret_creation::Error, - }, } /// External references resolved during the dereference step. @@ -51,40 +43,6 @@ pub async fn dereference( .await .context(AuthorizationConfigSnafu)?; - random_secret_creation::create_random_secret_if_not_exists( - &airflow.shared_internal_secret_secret_name(), - INTERNAL_SECRET_SECRET_KEY, - 256, - airflow, - client, - ) - .await - .context(InternalSecretSnafu)?; - - random_secret_creation::create_random_secret_if_not_exists( - &airflow.shared_jwt_secret_secret_name(), - JWT_SECRET_SECRET_KEY, - 256, - airflow, - client, - ) - .await - .context(InternalSecretSnafu)?; - - // https://airflow.apache.org/docs/apache-airflow/stable/security/secrets/fernet.html#security-fernet - // does not document how long the fernet key should be, but recommends using - // python -c "from cryptography.fernet import Fernet; print(Fernet.generate_key().decode())" - // which returns `jUm21LuA76YZmrIa9u4eXRg0h0P24MDC9IDOmDvJbfw=`, which has 44 characters, which makes 32 bytes. - random_secret_creation::create_random_secret_if_not_exists( - &airflow.shared_fernet_key_secret_name(), - FERNET_KEY_SECRET_KEY, - 32, - airflow, - client, - ) - .await - .context(InternalSecretSnafu)?; - Ok(DereferencedObjects { authentication_config, authorization_config, From 2f428aac37ff93ba7032e59e2519576368d51683 Mon Sep 17 00:00:00 2001 From: Andrew Kenworthy Date: Wed, 13 May 2026 17:19:47 +0200 Subject: [PATCH 9/9] changelog --- CHANGELOG.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 548e7b68..9f29799a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -22,6 +22,7 @@ - BREAKING: Renamed and moved the `celeryExecutor` broker and results backend to `clusterConfig` ([#786]). The results backend `spec.celeryExecutors.resultBackend` is now `spec.clusterConfig.celeryResultsBackend`. The broker `spec.celeryExecutors.broker` is now `spec.clusterConfig.celeryBroker`. +- Internal operator refactoring: introduce dereference() and validate() steps in the reconciler ([#795]). ### Fixed @@ -37,6 +38,7 @@ [#754]: https://github.com/stackabletech/airflow-operator/pull/754 [#784]: https://github.com/stackabletech/airflow-operator/pull/784 [#786]: https://github.com/stackabletech/airflow-operator/pull/786 +[#795]: https://github.com/stackabletech/airflow-operator/pull/795 ## [26.3.0] - 2026-03-16