Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
312224b
refactor(app): scaffold dereference and validate submodules
maltesander May 20, 2026
c2cbddc
refactor(app): add dereference module with template/S3/logdir fetch
maltesander May 20, 2026
a7ae799
refactor(app): add validate module with image + product-config valida…
maltesander May 20, 2026
8b60c77
refactor(app): wire reconcile through dereference + validate
maltesander May 20, 2026
746a3ca
chore(history): rename history_controller.rs to controller.rs
maltesander May 20, 2026
aba4a07
refactor(history): scaffold dereference and validate submodules
maltesander May 20, 2026
f343b86
refactor(history): add dereference module with log directory fetch
maltesander May 20, 2026
730c61c
refactor(history): add validate module with image + product-config va…
maltesander May 20, 2026
8210bb1
refactor(history): wire reconcile through dereference + validate
maltesander May 20, 2026
5baf8ff
refactor(connect): scaffold dereference and validate submodules
maltesander May 20, 2026
82d495c
refactor(connect): add dereference module with S3 connection fetch
maltesander May 20, 2026
f8f8a67
refactor(connect): add validate module with image + role config resol…
maltesander May 20, 2026
99b4a64
refactor(connect): wire reconcile through dereference + validate
maltesander May 20, 2026
8bfce40
style: apply cargo fmt to wired controllers
maltesander May 20, 2026
32ed5f0
fix: correct error message
maltesander May 20, 2026
d731d13
fix: remove obsolete errors
maltesander May 20, 2026
7cc6cd2
fix: regenerate crate hashes
maltesander May 20, 2026
56c0802
test(spark-connect): trim 10-assert to bare CR Available wait
maltesander May 20, 2026
4d78418
test(smoke): add SparkHistoryServer resource snapshot assert
maltesander May 20, 2026
fa03bde
test(smoke): add SparkHistoryServer ConfigMap data snapshot
maltesander May 20, 2026
d884f61
test(smoke): add SparkApplication resource snapshot assert
maltesander May 20, 2026
c1af068
test(smoke): add SparkApplication ConfigMap data snapshot
maltesander May 20, 2026
ecae1c4
test(spark-connect): add SparkConnectServer resource snapshot assert
maltesander May 20, 2026
f335abb
test(spark-connect): add SparkConnectServer ConfigMap data snapshot
maltesander May 20, 2026
3b68fae
docs: adapt changelog
maltesander May 21, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ All notable changes to this project will be documented in this file.
- `SparkApplication`: `spark-env.sh` and `security.properties`
- `SparkHistoryServer`: `spark-defaults.conf`, `spark-env.sh` and `security.properties`
- `SparkConnectServer`: `spark-defaults.conf`, `metrics.properties` and `security.properties`
- Internal operator refactoring: introduce dereference() and validate() steps in the reconciler for spark application, spark connect and spark history server([#687]).

Previously, arbitrary file names were silently accepted and ignored ([#679]).
- Bump `stackable-operator` to 0.110.1 ([#679]).
Expand All @@ -27,6 +28,7 @@ All notable changes to this project will be documented in this file.
[#679]: https://github.com/stackabletech/spark-k8s-operator/pull/679
[#680]: https://github.com/stackabletech/spark-k8s-operator/pull/680
[#684]: https://github.com/stackabletech/spark-k8s-operator/pull/684
[#687]: https://github.com/stackabletech/spark-k8s-operator/pull/687

## [26.3.0] - 2026-03-16

Expand Down
18 changes: 9 additions & 9 deletions Cargo.nix

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

18 changes: 9 additions & 9 deletions crate-hashes.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

89 changes: 39 additions & 50 deletions rust/operator-binary/src/connect/controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::sync::Arc;
use snafu::{ResultExt, Snafu};
use stackable_operator::{
cluster_resources::{ClusterResourceApplyStrategy, ClusterResources},
commons::{product_image_selection, rbac::build_rbac_resources},
commons::rbac::build_rbac_resources,
kube::{
Resource, ResourceExt,
core::{DeserializeGuard, error_boundary},
Expand All @@ -21,10 +21,13 @@ use strum::{EnumDiscriminants, IntoStaticStr};
use super::crd::{CONNECT_APP_NAME, CONNECT_CONTROLLER_NAME, v1alpha1};
use crate::{
Ctx,
connect::{common, crd::SparkConnectServerStatus, executor, s3, server, service},
crd::constants::{CONTAINER_IMAGE_BASE_NAME, OPERATOR_NAME},
connect::{common, crd::SparkConnectServerStatus, executor, server, service},
crd::constants::OPERATOR_NAME,
};

pub mod dereference;
pub mod validate;

#[derive(Snafu, Debug, EnumDiscriminants)]
#[strum_discriminants(derive(IntoStaticStr))]
#[allow(clippy::enum_variant_names)]
Expand Down Expand Up @@ -124,11 +127,6 @@ pub enum Error {
BuildRbacResources {
source: stackable_operator::commons::rbac::Error,
},
#[snafu(display("failed to build connect server configuration"))]
ServerConfig { source: crate::connect::crd::Error },

#[snafu(display("failed to build connect executor configuration"))]
ExecutorConfig { source: crate::connect::crd::Error },

#[snafu(display("failed to build connect executor pod template"))]
ExecutorPodTemplate {
Expand All @@ -138,16 +136,14 @@ pub enum Error {
#[snafu(display("failed to serialize executor pod template"))]
ExecutorPodTemplateSerde { source: serde_yaml::Error },

#[snafu(display("failed to resolve product image"))]
ResolveProductImage {
source: product_image_selection::Error,
},

#[snafu(display("failed to resolve S3 connections for SparkConnectServer {name:?}"))]
ResolveS3Connections { source: s3::Error, name: String },

#[snafu(display("failed to build connect server S3 properties"))]
S3SparkProperties { source: crate::connect::s3::Error },

#[snafu(display("failed to dereference SparkConnectServer"))]
DereferenceSparkConnectServer { source: dereference::Error },

#[snafu(display("failed to validate SparkConnectServer"))]
ValidateSparkConnectServer { source: validate::Error },
}

type Result<T, E = Error> = std::result::Result<T, E>;
Expand All @@ -170,12 +166,22 @@ pub async fn reconcile(
.map_err(error_boundary::InvalidObject::clone)
.context(InvalidSparkConnectServerSnafu)?;

let server_config = scs.server_config().context(ServerConfigSnafu)?;
let server_role_config = &scs.spec.server.role_config;
let executor_config = scs.executor_config().context(ExecutorConfigSnafu)?;

let client = &ctx.client;

let dereferenced = dereference::dereference(client, scs)
.await
.context(DereferenceSparkConnectServerSnafu)?;

let validated = validate::validate(scs, dereferenced, &ctx.operator_environment)
.context(ValidateSparkConnectServerSnafu)?;

let server_config = &validated.server_config;
let executor_config = &validated.executor_config;
let resolved_product_image = &validated.resolved_product_image;
let resolved_s3 = &validated.dereferenced.resolved_s3;

let server_role_config = &scs.spec.server.role_config;

let mut cluster_resources = ClusterResources::new(
CONNECT_APP_NAME,
OPERATOR_NAME,
Expand All @@ -186,23 +192,6 @@ pub async fn reconcile(
)
.context(CreateClusterResourcesSnafu)?;

let resolved_product_image = scs
.spec
.image
.resolve(
CONTAINER_IMAGE_BASE_NAME,
&ctx.operator_environment.image_repository,
crate::built_info::PKG_VERSION,
)
.context(ResolveProductImageSnafu)?;

// Resolve any S3 connections early to fail fast if there are issues.
let resolved_s3 = s3::ResolvedS3::resolve(client, scs)
.await
.with_context(|_| ResolveS3ConnectionsSnafu {
name: scs.name_unchecked(),
})?;

// Use a dedicated service account for connect server pods.
let (service_account, role_binding) = build_rbac_resources(
scs,
Expand Down Expand Up @@ -251,21 +240,21 @@ pub async fn reconcile(
.context(S3SparkPropertiesSnafu)?,
server::server_properties(
scs,
&server_config,
server_config,
&applied_headless_service,
&service_account,
&resolved_product_image,
resolved_product_image,
)
.context(ServerPropertiesSnafu)?,
executor::executor_properties(scs, &executor_config, &resolved_product_image)
executor::executor_properties(scs, executor_config, resolved_product_image)
.context(ExecutorPropertiesSnafu)?,
])
.context(SerializePropertiesSnafu)?;

// ========================================
// Executor config map and pod template
let executor_config_map =
executor::executor_config_map(scs, &executor_config, &resolved_product_image).context(
executor::executor_config_map(scs, executor_config, resolved_product_image).context(
BuildExecutorConfigMapSnafu {
name: scs.name_unchecked(),
},
Expand All @@ -280,10 +269,10 @@ pub async fn reconcile(
let executor_pod_template = serde_yaml::to_string(
&executor::executor_pod_template(
scs,
&executor_config,
&resolved_product_image,
executor_config,
resolved_product_image,
&executor_config_map,
&resolved_s3,
resolved_s3,
)
.context(ExecutorPodTemplateSnafu)?,
)
Expand All @@ -293,8 +282,8 @@ pub async fn reconcile(
// Server config map
let server_config_map = server::server_config_map(
scs,
&server_config,
&resolved_product_image,
server_config,
resolved_product_image,
&spark_props,
&executor_pod_template,
)
Expand All @@ -310,7 +299,7 @@ pub async fn reconcile(

// ========================================
// Server listener
let listener = server::build_listener(scs, server_role_config, &resolved_product_image)
let listener = server::build_listener(scs, server_role_config, resolved_product_image)
.context(BuildListenerSnafu)?;

let applied_listener = cluster_resources
Expand All @@ -323,13 +312,13 @@ pub async fn reconcile(
let args = server::command_args(&scs.spec.args);
let stateful_set = server::build_stateful_set(
scs,
&server_config,
&resolved_product_image,
server_config,
resolved_product_image,
&service_account,
&server_config_map,
&applied_listener.name_any(),
args,
&resolved_s3,
resolved_s3,
)
.context(BuildServerStatefulSetSnafu)?;

Expand Down
33 changes: 33 additions & 0 deletions rust/operator-binary/src/connect/controller/dereference.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
//! The dereference step in the SparkConnectServer controller.
//!
//! Fetches the resolved S3 configuration referenced by the SparkConnectServer spec.

use snafu::{ResultExt, Snafu};
use stackable_operator::{client::Client, kube::ResourceExt};

use crate::connect::{crd::v1alpha1, s3};

#[derive(Snafu, Debug)]
pub enum Error {
#[snafu(display("failed to resolve S3 connection for {name}"))]
ResolveS3Connections { source: s3::Error, name: String },
}

type Result<T, E = Error> = std::result::Result<T, E>;

pub struct DereferencedSparkConnectServer {
pub resolved_s3: s3::ResolvedS3,
}

pub async fn dereference(
client: &Client,
scs: &v1alpha1::SparkConnectServer,
) -> Result<DereferencedSparkConnectServer> {
let resolved_s3 = s3::ResolvedS3::resolve(client, scs)
.await
.with_context(|_| ResolveS3ConnectionsSnafu {
name: scs.name_unchecked(),
})?;

Ok(DereferencedSparkConnectServer { resolved_s3 })
}
Loading
Loading