From bcfd087a45e12eea58affdea3f493b148657fb3f Mon Sep 17 00:00:00 2001 From: GatewayJ <835269233@qq.com> Date: Sun, 24 May 2026 14:30:45 +0800 Subject: [PATCH] feat: add pool decommission lifecycle --- console-web/lib/api.ts | 24 + console-web/types/api.ts | 28 + deploy/rustfs-operator/crds/tenant-crd.yaml | 140 +++ deploy/rustfs-operator/crds/tenant.yaml | 140 +++ src/console/handlers/pools.rs | 701 ++++++++++- src/console/models/pool.rs | 37 + src/console/openapi.rs | 62 +- src/console/routes/mod.rs | 8 + src/context.rs | 18 +- src/reconcile.rs | 33 +- src/reconcile/phases.rs | 356 +++++- src/reconcile/pool_lifecycle.rs | 1236 +++++++++++++++++++ src/sts/rustfs_client.rs | 312 ++++- src/types/v1alpha1.rs | 1 + src/types/v1alpha1/pool_lifecycle.rs | 79 ++ src/types/v1alpha1/status.rs | 14 + src/types/v1alpha1/status/pool.rs | 142 +++ src/types/v1alpha1/tenant.rs | 11 +- src/types/v1alpha1/tenant/workloads.rs | 40 +- 19 files changed, 3333 insertions(+), 49 deletions(-) create mode 100644 src/reconcile/pool_lifecycle.rs create mode 100644 src/types/v1alpha1/pool_lifecycle.rs diff --git a/console-web/lib/api.ts b/console-web/lib/api.ts index d1c24dd..5c686e1 100644 --- a/console-web/lib/api.ts +++ b/console-web/lib/api.ts @@ -9,9 +9,12 @@ import type { PoolDetails, AddPoolRequest, AddPoolResponse, + CancelPoolDecommissionRequest, DeletePoolResponse, + PoolDecommissionRequestResponse, PodListResponse, PodDetails, + StartPoolDecommissionRequest, DeletePodResponse, NodeListResponse, NamespaceListResponse, @@ -120,6 +123,27 @@ export async function deletePool(namespace: string, tenantName: string, poolName return apiClient.delete(`${pool(namespace, tenantName, poolName)}`) } +export async function startPoolDecommission( + namespace: string, + tenantName: string, + poolName: string, + body: StartPoolDecommissionRequest, +): Promise { + return apiClient.post(`${pool(namespace, tenantName, poolName)}/decommission`, body) +} + +export async function cancelPoolDecommission( + namespace: string, + tenantName: string, + poolName: string, + body: CancelPoolDecommissionRequest, +): Promise { + return apiClient.post( + `${pool(namespace, tenantName, poolName)}/decommission/cancel`, + body, + ) +} + // ----- Pods ----- export async function listPods(namespace: string, tenantName: string): Promise { return apiClient.get(`${pods(namespace, tenantName)}`) diff --git a/console-web/types/api.ts b/console-web/types/api.ts index 4f3bd1c..104d998 100644 --- a/console-web/types/api.ts +++ b/console-web/types/api.ts @@ -163,6 +163,16 @@ export interface PoolDetails { current_revision: string | null update_revision: string | null state: string + lifecycle_state: string | null + workload_state: string | null + decommission_phase: string | null + decommission_objects_migrated: number | null + decommission_bytes_migrated: number | null + decommission_objects_failed: number | null + decommission_bytes_failed: number | null + decommission_cleanup_state: string | null + decommission_last_error: string | null + decommission_last_poll_time: string | null created_at: string | null } @@ -195,6 +205,24 @@ export interface DeletePoolResponse { warning?: string } +export interface StartPoolDecommissionRequest { + requestId: string + reason?: string +} + +export interface CancelPoolDecommissionRequest { + requestId: string + reason?: string +} + +export interface PoolDecommissionRequestResponse { + success: boolean + message: string + poolName: string + requestId: string + action: string +} + // ----- Pod ----- export interface PodListItem { name: string diff --git a/deploy/rustfs-operator/crds/tenant-crd.yaml b/deploy/rustfs-operator/crds/tenant-crd.yaml index b7556d5..967db03 100644 --- a/deploy/rustfs-operator/crds/tenant-crd.yaml +++ b/deploy/rustfs-operator/crds/tenant-crd.yaml @@ -422,6 +422,45 @@ spec: - null nullable: true type: string + poolLifecycle: + description: Explicit lifecycle requests for pool decommissioning. + nullable: true + properties: + decommissionRequests: + items: + properties: + action: + enum: + - Start + - Cancel + type: string + cancelRequestedAt: + nullable: true + type: string + poolName: + type: string + reason: + nullable: true + type: string + requestId: + type: string + requestedAt: + nullable: true + type: string + required: + - action + - poolName + - requestId + type: object + type: array + pvcRetentionPolicy: + enum: + - Retain + type: string + type: object + x-kubernetes-validations: + - message: decommissionRequests must contain at most one entry per poolName + rule: '!has(self.decommissionRequests) || self.decommissionRequests.all(r, self.decommissionRequests.exists_one(other, other.poolName == r.poolName))' pools: items: description: |- @@ -1603,10 +1642,106 @@ spec: description: Current revision hash of the StatefulSet nullable: true type: string + decommission: + description: Decommission progress and cleanup status for this pool. + nullable: true + properties: + cleanup: + nullable: true + properties: + pvcRetentionPolicy: + nullable: true + type: string + state: + enum: + - Pending + - StatefulSetDeleting + - PvcRetained + type: string + statefulSetDeletedAt: + nullable: true + type: string + required: + - state + type: object + completedAt: + nullable: true + type: string + endpointSetHash: + nullable: true + type: string + lastError: + nullable: true + properties: + message: + nullable: true + type: string + reason: + nullable: true + type: string + type: object + lastPollTime: + nullable: true + type: string + phase: + enum: + - Pending + - Running + - Complete + - Canceled + - Failed + - null + nullable: true + type: string + progress: + nullable: true + properties: + bytesFailed: + format: int64 + nullable: true + type: integer + bytesMigrated: + format: int64 + nullable: true + type: integer + objectsFailed: + format: int64 + nullable: true + type: integer + objectsMigrated: + format: int64 + nullable: true + type: integer + type: object + requestId: + nullable: true + type: string + rustfsPoolId: + nullable: true + type: string + startedAt: + nullable: true + type: string + type: object lastUpdateTime: description: Last time the pool status was updated nullable: true type: string + lifecycleState: + description: Lifecycle state of the pool, separate from StatefulSet rollout state. + enum: + - Active + - Decommissioning + - Decommissioned + - DecommissionCanceled + - DecommissionFailed + - null + nullable: true + type: string + name: + description: Pool name from Tenant spec. Optional for backward compatibility with older status. + nullable: true + type: string readyReplicas: description: Number of pods with Ready condition format: int32 @@ -1632,6 +1767,10 @@ spec: format: int32 nullable: true type: integer + workloadState: + description: Workload rollout state of this pool. Mirrors `state` for compatibility. + nullable: true + type: string required: - ssName - state @@ -1650,3 +1789,4 @@ spec: storage: true subresources: status: {} +--- diff --git a/deploy/rustfs-operator/crds/tenant.yaml b/deploy/rustfs-operator/crds/tenant.yaml index b7556d5..967db03 100755 --- a/deploy/rustfs-operator/crds/tenant.yaml +++ b/deploy/rustfs-operator/crds/tenant.yaml @@ -422,6 +422,45 @@ spec: - null nullable: true type: string + poolLifecycle: + description: Explicit lifecycle requests for pool decommissioning. + nullable: true + properties: + decommissionRequests: + items: + properties: + action: + enum: + - Start + - Cancel + type: string + cancelRequestedAt: + nullable: true + type: string + poolName: + type: string + reason: + nullable: true + type: string + requestId: + type: string + requestedAt: + nullable: true + type: string + required: + - action + - poolName + - requestId + type: object + type: array + pvcRetentionPolicy: + enum: + - Retain + type: string + type: object + x-kubernetes-validations: + - message: decommissionRequests must contain at most one entry per poolName + rule: '!has(self.decommissionRequests) || self.decommissionRequests.all(r, self.decommissionRequests.exists_one(other, other.poolName == r.poolName))' pools: items: description: |- @@ -1603,10 +1642,106 @@ spec: description: Current revision hash of the StatefulSet nullable: true type: string + decommission: + description: Decommission progress and cleanup status for this pool. + nullable: true + properties: + cleanup: + nullable: true + properties: + pvcRetentionPolicy: + nullable: true + type: string + state: + enum: + - Pending + - StatefulSetDeleting + - PvcRetained + type: string + statefulSetDeletedAt: + nullable: true + type: string + required: + - state + type: object + completedAt: + nullable: true + type: string + endpointSetHash: + nullable: true + type: string + lastError: + nullable: true + properties: + message: + nullable: true + type: string + reason: + nullable: true + type: string + type: object + lastPollTime: + nullable: true + type: string + phase: + enum: + - Pending + - Running + - Complete + - Canceled + - Failed + - null + nullable: true + type: string + progress: + nullable: true + properties: + bytesFailed: + format: int64 + nullable: true + type: integer + bytesMigrated: + format: int64 + nullable: true + type: integer + objectsFailed: + format: int64 + nullable: true + type: integer + objectsMigrated: + format: int64 + nullable: true + type: integer + type: object + requestId: + nullable: true + type: string + rustfsPoolId: + nullable: true + type: string + startedAt: + nullable: true + type: string + type: object lastUpdateTime: description: Last time the pool status was updated nullable: true type: string + lifecycleState: + description: Lifecycle state of the pool, separate from StatefulSet rollout state. + enum: + - Active + - Decommissioning + - Decommissioned + - DecommissionCanceled + - DecommissionFailed + - null + nullable: true + type: string + name: + description: Pool name from Tenant spec. Optional for backward compatibility with older status. + nullable: true + type: string readyReplicas: description: Number of pods with Ready condition format: int32 @@ -1632,6 +1767,10 @@ spec: format: int32 nullable: true type: integer + workloadState: + description: Workload rollout state of this pool. Mirrors `state` for compatibility. + nullable: true + type: string required: - ssName - state @@ -1650,3 +1789,4 @@ spec: storage: true subresources: status: {} +--- diff --git a/src/console/handlers/pools.rs b/src/console/handlers/pools.rs index 30167b4..d31ee4b 100755 --- a/src/console/handlers/pools.rs +++ b/src/console/handlers/pools.rs @@ -25,7 +25,11 @@ use crate::console::{ use crate::types::v1alpha1::{ persistence::PersistenceConfig, pool::{Pool, SchedulingConfig, validate_pool_total_volumes}, + pool_lifecycle::{ + DecommissionAction, DecommissionRequest, PoolLifecycleSpec, PvcRetentionPolicy, + }, status::next_actions_for_reason, + status::pool::PoolLifecycleState, tenant::Tenant, }; @@ -94,6 +98,7 @@ enum PoolDeleteDecision { const REASON_DECOMMISSION_REQUIRED: &str = "DecommissionRequired"; const REASON_OBSERVATION_STALE: &str = "ObservedGenerationStale"; +const REASON_DECOMMISSION_REQUEST_CONFLICT: &str = "DecommissionRequestConflict"; fn action_strings(reason: &str) -> Vec { next_actions_for_reason(reason) @@ -115,11 +120,11 @@ fn ensure_pool_delete_does_not_remove_last_pool(total_pools: usize) -> Result<() fn classify_pool_delete( total_pools: usize, managed_statefulset_exists: bool, - recorded_pool_status_exists: bool, + recorded_pool_status_requires_decommission: bool, ) -> Result { ensure_pool_delete_does_not_remove_last_pool(total_pools)?; - if managed_statefulset_exists || recorded_pool_status_exists { + if managed_statefulset_exists || recorded_pool_status_requires_decommission { return Ok(PoolDeleteDecision::RequiresDecommission { reason: REASON_DECOMMISSION_REQUIRED, }); @@ -128,6 +133,206 @@ fn classify_pool_delete( Ok(PoolDeleteDecision::RemoveFromSpec) } +fn pool_status_matches( + pool_status: &crate::types::v1alpha1::status::pool::Pool, + pool_name: &str, + ss_name: &str, +) -> bool { + pool_status.name.as_deref() == Some(pool_name) || pool_status.ss_name == ss_name +} + +fn pool_status_requires_decommission_before_spec_removal( + pool_status: &crate::types::v1alpha1::status::pool::Pool, +) -> bool { + !matches!( + pool_status.lifecycle_state, + Some(PoolLifecycleState::Decommissioned) + ) +} + +fn has_recorded_pool_status_requiring_decommission( + tenant: &Tenant, + pool_name: &str, + ss_name: &str, +) -> bool { + tenant.status.as_ref().is_some_and(|status| { + status + .pools + .iter() + .filter(|pool_status| pool_status_matches(pool_status, pool_name, ss_name)) + .any(pool_status_requires_decommission_before_spec_removal) + }) +} + +fn recorded_pool_lifecycle_state(tenant: &Tenant, pool_name: &str) -> Option { + let ss_name = format!("{}-{}", tenant.name(), pool_name); + tenant.status.as_ref().and_then(|status| { + status + .pools + .iter() + .find(|pool_status| pool_status_matches(pool_status, pool_name, &ss_name)) + .and_then(|pool_status| pool_status.lifecycle_state.clone()) + }) +} + +fn decommission_request_can_replace( + tenant: &Tenant, + pool_name: &str, + existing: &DecommissionRequest, + request_id: &str, + action: &DecommissionAction, +) -> bool { + if existing.request_id == request_id && &existing.action == action { + return true; + } + + if action == &DecommissionAction::Cancel { + return existing.action == DecommissionAction::Start + || matches!( + recorded_pool_lifecycle_state(tenant, pool_name), + Some(PoolLifecycleState::Decommissioning) + ); + } + + matches!( + recorded_pool_lifecycle_state(tenant, pool_name), + Some(PoolLifecycleState::DecommissionCanceled | PoolLifecycleState::DecommissionFailed) + ) +} + +fn decommission_cancel_is_allowed( + tenant: &Tenant, + pool_name: &str, + existing: Option<&DecommissionRequest>, +) -> bool { + existing.is_some_and(|request| { + matches!( + request.action, + DecommissionAction::Start | DecommissionAction::Cancel + ) + }) || matches!( + recorded_pool_lifecycle_state(tenant, pool_name), + Some(PoolLifecycleState::Decommissioning) + ) +} + +fn decommission_cancel_not_allowed_error( + namespace: &str, + tenant_name: &str, + pool_name: &str, +) -> Error { + Error::ActionRequired { + status: StatusCode::CONFLICT, + code: "PoolDecommissionCancelNotAllowed".to_string(), + reason: REASON_DECOMMISSION_REQUEST_CONFLICT.to_string(), + message: format!( + "Pool '{}' does not have an active decommission request to cancel.", + pool_name + ), + next_actions: vec!["inspectPoolStatus".to_string()], + details: Some(Box::new(ConsoleErrorDetails { + namespace: Some(namespace.to_string()), + tenant: Some(tenant_name.to_string()), + resource: Some(pool_name.to_string()), + })), + } +} + +fn now_rfc3339() -> String { + chrono::Utc::now().to_rfc3339_opts(chrono::SecondsFormat::Secs, true) +} + +fn validate_lifecycle_request_id(request_id: &str) -> Result<()> { + if request_id.trim().is_empty() { + return Err(Error::BadRequest { + message: "requestID must not be empty".to_string(), + }); + } + + Ok(()) +} + +fn decommission_request_conflict_error( + namespace: &str, + tenant_name: &str, + pool_name: &str, +) -> Error { + Error::ActionRequired { + status: StatusCode::CONFLICT, + code: "PoolDecommissionRequestConflict".to_string(), + reason: REASON_DECOMMISSION_REQUEST_CONFLICT.to_string(), + message: format!( + "Pool '{}' already has a different decommission request. Wait for it to finish or clear the current request.", + pool_name + ), + next_actions: vec!["inspectPoolStatus".to_string()], + details: Some(Box::new(ConsoleErrorDetails { + namespace: Some(namespace.to_string()), + tenant: Some(tenant_name.to_string()), + resource: Some(pool_name.to_string()), + })), + } +} + +fn upsert_decommission_request( + tenant: &mut Tenant, + pool_name: &str, + request_id: &str, + action: DecommissionAction, + reason: Option, +) -> Result<()> { + validate_lifecycle_request_id(request_id)?; + + let lifecycle = tenant + .spec + .pool_lifecycle + .get_or_insert_with(|| PoolLifecycleSpec { + pvc_retention_policy: PvcRetentionPolicy::Retain, + decommission_requests: Vec::new(), + }); + + if let Some(existing) = lifecycle + .decommission_requests + .iter_mut() + .find(|request| request.pool_name == pool_name) + { + existing.request_id = request_id.to_string(); + existing.action = action.clone(); + existing.reason = reason; + if action == DecommissionAction::Cancel { + existing.cancel_requested_at = Some(now_rfc3339()); + } else { + existing.requested_at = Some(now_rfc3339()); + existing.cancel_requested_at = None; + } + return Ok(()); + } + + lifecycle.decommission_requests.push(DecommissionRequest { + pool_name: pool_name.to_string(), + request_id: request_id.to_string(), + action: action.clone(), + requested_at: Some(now_rfc3339()), + cancel_requested_at: (action == DecommissionAction::Cancel).then(now_rfc3339), + reason, + }); + + Ok(()) +} + +fn remove_decommission_request(tenant: &mut Tenant, pool_name: &str) { + let Some(lifecycle) = tenant.spec.pool_lifecycle.as_mut() else { + return; + }; + + lifecycle + .decommission_requests + .retain(|request| request.pool_name != pool_name); + if lifecycle.decommission_requests.is_empty() { + tenant.spec.pool_lifecycle = None; + } +} + fn is_pool_observation_current(tenant: &Tenant) -> bool { tenant .status @@ -302,6 +507,26 @@ pub async fn list_pools( }) }); + let recorded_pool_status = tenant.status.as_ref().and_then(|status| { + status + .pools + .iter() + .find(|pool_status| pool_status_matches(pool_status, &pool.name, &ss_name)) + }); + let decommission = recorded_pool_status.and_then(|status| status.decommission.as_ref()); + let progress = decommission.and_then(|status| status.progress.as_ref()); + let cleanup = decommission.and_then(|status| status.cleanup.as_ref()); + let decommission_last_error = decommission + .and_then(|status| status.last_error.as_ref()) + .and_then( + |last_error| match (&last_error.reason, &last_error.message) { + (Some(reason), Some(message)) => Some(format!("{reason}: {message}")), + (Some(reason), None) => Some(reason.clone()), + (None, Some(message)) => Some(message.clone()), + (None, None) => None, + }, + ); + pools_details.push(PoolDetails { name: pool.name.clone(), servers: pool.servers, @@ -315,6 +540,23 @@ pub async fn list_pools( current_revision, update_revision, state, + lifecycle_state: recorded_pool_status + .and_then(|status| status.lifecycle_state.as_ref()) + .map(ToString::to_string), + workload_state: recorded_pool_status + .and_then(|status| status.workload_state.as_ref()) + .map(ToString::to_string), + decommission_phase: decommission + .and_then(|status| status.phase.as_ref()) + .map(ToString::to_string), + decommission_objects_migrated: progress.and_then(|progress| progress.objects_migrated), + decommission_bytes_migrated: progress.and_then(|progress| progress.bytes_migrated), + decommission_objects_failed: progress.and_then(|progress| progress.objects_failed), + decommission_bytes_failed: progress.and_then(|progress| progress.bytes_failed), + decommission_cleanup_state: cleanup.map(|cleanup| cleanup.state.to_string()), + decommission_last_error, + decommission_last_poll_time: decommission + .and_then(|status| status.last_poll_time.clone()), created_at: ss.and_then(|s| { s.metadata .creation_timestamp @@ -445,6 +687,7 @@ pub async fn add_pool( }); } + remove_decommission_request(&mut tenant, pool_name); tenant.spec.pools.push(new_pool.clone()); match tenant_api @@ -468,6 +711,16 @@ pub async fn add_pool( current_revision: None, update_revision: None, state: "Creating".to_string(), + lifecycle_state: None, + workload_state: None, + decommission_phase: None, + decommission_objects_migrated: None, + decommission_bytes_migrated: None, + decommission_objects_failed: None, + decommission_bytes_failed: None, + decommission_cleanup_state: None, + decommission_last_error: None, + decommission_last_poll_time: None, created_at: t.metadata.creation_timestamp.map(|ts| ts.0.to_rfc3339()), }, })); @@ -486,6 +739,146 @@ pub async fn add_pool( })) } +async fn write_pool_decommission_request( + namespace: String, + tenant_name: String, + pool_name: String, + claims: Claims, + request_id: String, + action: DecommissionAction, + reason: Option, +) -> Result> { + validate_lifecycle_request_id(&request_id)?; + + let client = create_client(&claims).await?; + let tenant_api: Api = Api::namespaced(client, &namespace); + + const MAX_RETRIES: u32 = 3; + let mut last_conflict = None; + for _ in 0..MAX_RETRIES { + let mut tenant = tenant_api + .get(&tenant_name) + .await + .map_err(|e| error::map_kube_error(e, format!("Tenant '{}'", tenant_name)))?; + + if !tenant.spec.pools.iter().any(|pool| pool.name == pool_name) { + return Err(Error::NotFound { + resource: format!("Pool '{}'", pool_name), + }); + } + + if action == DecommissionAction::Start { + ensure_pool_delete_does_not_remove_last_pool(tenant.spec.pools.len())?; + } + + let existing_decommission_request = tenant + .spec + .pool_lifecycle + .as_ref() + .and_then(|lifecycle| lifecycle.request_for_pool(&pool_name)); + + if action == DecommissionAction::Cancel + && !decommission_cancel_is_allowed(&tenant, &pool_name, existing_decommission_request) + { + return Err(decommission_cancel_not_allowed_error( + &namespace, + &tenant_name, + &pool_name, + )); + } + + if let Some(existing) = existing_decommission_request + && !decommission_request_can_replace( + &tenant, + &pool_name, + existing, + &request_id, + &action, + ) + { + return Err(decommission_request_conflict_error( + &namespace, + &tenant_name, + &pool_name, + )); + } + + upsert_decommission_request( + &mut tenant, + &pool_name, + &request_id, + action.clone(), + reason.clone(), + )?; + + match tenant_api + .replace(&tenant_name, &Default::default(), &tenant) + .await + { + Ok(_) => { + let action_label = action.to_string(); + return Ok(Json(PoolDecommissionRequestResponse { + success: true, + message: format!( + "Pool '{}' decommission {} request '{}' was accepted", + pool_name, action_label, request_id + ), + pool_name, + request_id, + action: action_label, + })); + } + Err(e) => { + let mapped = error::map_kube_error(e, String::new()); + if !matches!(&mapped, Error::Conflict { .. }) { + return Err(mapped); + } + last_conflict = Some(mapped); + } + } + } + + Err(last_conflict.unwrap_or_else(|| Error::Conflict { + message: "Resource was modified by another request, please retry".to_string(), + })) +} + +/// Write a Start decommission lifecycle request for a pool. +pub async fn start_pool_decommission( + Path((namespace, tenant_name, pool_name)): Path<(String, String, String)>, + Extension(claims): Extension, + Json(req): Json, +) -> Result> { + write_pool_decommission_request( + namespace, + tenant_name, + pool_name, + claims, + req.request_id, + DecommissionAction::Start, + req.reason, + ) + .await +} + +/// Write a Cancel decommission lifecycle request for a pool. +pub async fn cancel_pool_decommission( + Path((namespace, tenant_name, pool_name)): Path<(String, String, String)>, + Extension(claims): Extension, + Json(req): Json, +) -> Result> { + write_pool_decommission_request( + namespace, + tenant_name, + pool_name, + claims, + req.request_id, + DecommissionAction::Cancel, + req.reason, + ) + .await +} + /// Remove a pool from the tenant with optimistic-lock retries. pub async fn delete_pool( Path((namespace, tenant_name, pool_name)): Path<(String, String, String)>, @@ -534,17 +927,13 @@ pub async fn delete_pool( } }; - let recorded_pool_status_exists = tenant.status.as_ref().is_some_and(|status| { - status - .pools - .iter() - .any(|pool_status| pool_status.ss_name == ss_name) - }); + let recorded_pool_status_requires_decommission = + has_recorded_pool_status_requiring_decommission(&tenant, &pool_name, &ss_name); match classify_pool_delete( tenant.spec.pools.len(), managed_statefulset_exists, - recorded_pool_status_exists, + recorded_pool_status_requires_decommission, )? { PoolDeleteDecision::RemoveFromSpec => {} PoolDeleteDecision::RequiresDecommission { reason } => { @@ -558,6 +947,7 @@ pub async fn delete_pool( } tenant.spec.pools.remove(pool_index); + remove_decommission_request(&mut tenant, &pool_name); match tenant_api .replace(&tenant_name, &Default::default(), &tenant) @@ -606,12 +996,26 @@ async fn create_client(claims: &Claims) -> Result { #[cfg(test)] mod tests { use super::{ - PoolDeleteDecision, classify_pool_delete, is_managed_pool_statefulset, - is_pool_observation_current, pool_delete_observation_pending_error, - pool_delete_requires_decommission_error, + PoolDeleteDecision, classify_pool_delete, decommission_cancel_is_allowed, + decommission_request_can_replace, has_recorded_pool_status_requiring_decommission, + is_managed_pool_statefulset, is_pool_observation_current, + pool_delete_observation_pending_error, pool_delete_requires_decommission_error, + remove_decommission_request, upsert_decommission_request, validate_lifecycle_request_id, }; use crate::console::error::Error; - use crate::types::v1alpha1::{status::Status, tenant::TenantSpec}; + use crate::types::v1alpha1::{ + pool_lifecycle::{ + DecommissionAction, DecommissionRequest, PoolLifecycleSpec, PvcRetentionPolicy, + }, + status::{ + Status, + pool::{ + Pool as PoolStatus, PoolDecommissionCleanupState, PoolDecommissionCleanupStatus, + PoolLifecycleState, PoolState, + }, + }, + tenant::TenantSpec, + }; use axum::http::StatusCode; use std::collections::BTreeMap; @@ -723,6 +1127,277 @@ mod tests { } } + #[test] + fn recorded_decommissioned_pool_status_does_not_block_spec_removal() { + let mut tenant = tenant_with_generations(2, Some(2)); + tenant.status = Some(Status { + observed_generation: Some(2), + pools: vec![PoolStatus { + name: Some("pool-a".to_string()), + ss_name: "logs-pool-a".to_string(), + state: PoolState::NotCreated, + lifecycle_state: Some(PoolLifecycleState::Decommissioned), + workload_state: Some(PoolState::NotCreated), + decommission: None, + replicas: None, + ready_replicas: None, + current_replicas: None, + updated_replicas: None, + current_revision: None, + update_revision: None, + last_update_time: None, + }], + ..Default::default() + }); + + assert!(!has_recorded_pool_status_requiring_decommission( + &tenant, + "pool-a", + "logs-pool-a" + )); + } + + #[test] + fn recorded_active_pool_status_blocks_spec_removal() { + let mut tenant = tenant_with_generations(2, Some(2)); + tenant.status = Some(Status { + observed_generation: Some(2), + pools: vec![PoolStatus { + name: Some("pool-a".to_string()), + ss_name: "logs-pool-a".to_string(), + state: PoolState::RolloutComplete, + lifecycle_state: Some(PoolLifecycleState::Active), + workload_state: Some(PoolState::RolloutComplete), + decommission: None, + replicas: Some(4), + ready_replicas: Some(4), + current_replicas: Some(4), + updated_replicas: Some(4), + current_revision: None, + update_revision: None, + last_update_time: None, + }], + ..Default::default() + }); + + assert!(has_recorded_pool_status_requiring_decommission( + &tenant, + "pool-a", + "logs-pool-a" + )); + } + + #[test] + fn lifecycle_request_id_is_required() { + assert!(validate_lifecycle_request_id("request-1").is_ok()); + assert!(matches!( + validate_lifecycle_request_id(" "), + Err(Error::BadRequest { .. }) + )); + } + + #[test] + fn cancel_decommission_replaces_existing_start_request() { + let mut tenant = tenant_with_generations(2, Some(2)); + tenant.spec.pool_lifecycle = Some(PoolLifecycleSpec { + pvc_retention_policy: PvcRetentionPolicy::Retain, + decommission_requests: vec![DecommissionRequest { + pool_name: "pool-a".to_string(), + request_id: "start-1".to_string(), + action: DecommissionAction::Start, + requested_at: Some("2026-05-20T00:00:00Z".to_string()), + cancel_requested_at: None, + reason: None, + }], + }); + + let existing = tenant + .spec + .pool_lifecycle + .as_ref() + .and_then(|lifecycle| lifecycle.request_for_pool("pool-a")) + .expect("request should exist"); + assert!(decommission_request_can_replace( + &tenant, + "pool-a", + existing, + "cancel-1", + &DecommissionAction::Cancel + )); + + upsert_decommission_request( + &mut tenant, + "pool-a", + "cancel-1", + DecommissionAction::Cancel, + Some("operator requested stop".to_string()), + ) + .expect("cancel should replace start"); + + let request = tenant + .spec + .pool_lifecycle + .as_ref() + .and_then(|lifecycle| lifecycle.request_for_pool("pool-a")) + .expect("request should remain"); + assert_eq!(request.request_id, "cancel-1"); + assert_eq!(request.action, DecommissionAction::Cancel); + assert!(request.cancel_requested_at.is_some()); + } + + #[test] + fn cancel_decommission_requires_active_request_or_status() { + let mut tenant = tenant_with_generations(2, Some(2)); + assert!(!decommission_cancel_is_allowed(&tenant, "pool-a", None)); + + tenant.status = Some(Status { + observed_generation: Some(2), + pools: vec![PoolStatus { + name: Some("pool-a".to_string()), + ss_name: "logs-pool-a".to_string(), + state: PoolState::RolloutComplete, + lifecycle_state: Some(PoolLifecycleState::Decommissioning), + workload_state: Some(PoolState::RolloutComplete), + decommission: None, + replicas: Some(4), + ready_replicas: Some(4), + current_replicas: Some(4), + updated_replicas: Some(4), + current_revision: None, + update_revision: None, + last_update_time: None, + }], + ..Default::default() + }); + assert!(decommission_cancel_is_allowed(&tenant, "pool-a", None)); + } + + #[test] + fn remove_decommission_request_drops_stale_pool_lifecycle_entry() { + let mut tenant = tenant_with_generations(2, Some(2)); + tenant.spec.pool_lifecycle = Some(PoolLifecycleSpec { + pvc_retention_policy: PvcRetentionPolicy::Retain, + decommission_requests: vec![ + DecommissionRequest { + pool_name: "pool-a".to_string(), + request_id: "start-a".to_string(), + action: DecommissionAction::Start, + requested_at: Some("2026-05-20T00:00:00Z".to_string()), + cancel_requested_at: None, + reason: None, + }, + DecommissionRequest { + pool_name: "pool-b".to_string(), + request_id: "start-b".to_string(), + action: DecommissionAction::Start, + requested_at: Some("2026-05-20T00:00:00Z".to_string()), + cancel_requested_at: None, + reason: None, + }, + ], + }); + + remove_decommission_request(&mut tenant, "pool-a"); + let requests = &tenant + .spec + .pool_lifecycle + .as_ref() + .expect("lifecycle should remain for pool-b") + .decommission_requests; + assert_eq!(requests.len(), 1); + assert_eq!(requests[0].pool_name, "pool-b"); + + remove_decommission_request(&mut tenant, "pool-b"); + assert!(tenant.spec.pool_lifecycle.is_none()); + } + + #[test] + fn start_decommission_can_replace_terminal_canceled_request() { + let mut tenant = tenant_with_generations(2, Some(2)); + tenant.spec.pool_lifecycle = Some(PoolLifecycleSpec { + pvc_retention_policy: PvcRetentionPolicy::Retain, + decommission_requests: vec![DecommissionRequest { + pool_name: "pool-a".to_string(), + request_id: "cancel-1".to_string(), + action: DecommissionAction::Cancel, + requested_at: Some("2026-05-20T00:00:00Z".to_string()), + cancel_requested_at: Some("2026-05-20T00:01:00Z".to_string()), + reason: None, + }], + }); + tenant.status = Some(Status { + observed_generation: Some(2), + pools: vec![PoolStatus { + name: Some("pool-a".to_string()), + ss_name: "logs-pool-a".to_string(), + state: PoolState::NotCreated, + lifecycle_state: Some(PoolLifecycleState::DecommissionCanceled), + workload_state: Some(PoolState::NotCreated), + decommission: None, + replicas: None, + ready_replicas: None, + current_replicas: None, + updated_replicas: None, + current_revision: None, + update_revision: None, + last_update_time: None, + }], + ..Default::default() + }); + + let existing = tenant + .spec + .pool_lifecycle + .as_ref() + .and_then(|lifecycle| lifecycle.request_for_pool("pool-a")) + .expect("request should exist"); + assert!(decommission_request_can_replace( + &tenant, + "pool-a", + existing, + "start-2", + &DecommissionAction::Start + )); + } + + #[test] + fn decommissioned_cleanup_state_is_non_blocking() { + let pool_status = PoolStatus { + name: Some("pool-a".to_string()), + ss_name: "logs-pool-a".to_string(), + state: PoolState::NotCreated, + lifecycle_state: Some(PoolLifecycleState::Decommissioned), + workload_state: Some(PoolState::NotCreated), + decommission: Some( + crate::types::v1alpha1::status::pool::PoolDecommissionStatus { + cleanup: Some(PoolDecommissionCleanupStatus { + state: PoolDecommissionCleanupState::PvcRetained, + stateful_set_deleted_at: Some("2026-05-20T00:00:00Z".to_string()), + pvc_retention_policy: Some("Retain".to_string()), + }), + request_id: Some("request-1".to_string()), + rustfs_pool_id: Some("1".to_string()), + endpoint_set_hash: Some("sha256:test".to_string()), + phase: None, + started_at: None, + last_poll_time: None, + completed_at: None, + progress: None, + last_error: None, + }, + ), + replicas: None, + ready_replicas: None, + current_replicas: None, + updated_replicas: None, + current_revision: None, + update_revision: None, + last_update_time: None, + }; + + assert!(!super::pool_status_requires_decommission_before_spec_removal(&pool_status)); + } + #[test] fn pool_observation_requires_current_generation() { assert!(!is_pool_observation_current(&tenant_with_generations( diff --git a/src/console/models/pool.rs b/src/console/models/pool.rs index 8f294d9..6e3cdfa 100755 --- a/src/console/models/pool.rs +++ b/src/console/models/pool.rs @@ -30,6 +30,16 @@ pub struct PoolDetails { pub current_revision: Option, pub update_revision: Option, pub state: String, + pub lifecycle_state: Option, + pub workload_state: Option, + pub decommission_phase: Option, + pub decommission_objects_migrated: Option, + pub decommission_bytes_migrated: Option, + pub decommission_objects_failed: Option, + pub decommission_bytes_failed: Option, + pub decommission_cleanup_state: Option, + pub decommission_last_error: Option, + pub decommission_last_poll_time: Option, pub created_at: Option, } @@ -83,3 +93,30 @@ pub struct AddPoolResponse { pub message: String, pub pool: PoolDetails, } + +/// Request body to start decommissioning a pool. +#[derive(Debug, Deserialize, ToSchema)] +#[serde(rename_all = "camelCase")] +pub struct StartPoolDecommissionRequest { + pub request_id: String, + pub reason: Option, +} + +/// Request body to cancel pool decommissioning. +#[derive(Debug, Deserialize, ToSchema)] +#[serde(rename_all = "camelCase")] +pub struct CancelPoolDecommissionRequest { + pub request_id: String, + pub reason: Option, +} + +/// Response after writing a pool decommission lifecycle request. +#[derive(Debug, Serialize, ToSchema)] +#[serde(rename_all = "camelCase")] +pub struct PoolDecommissionRequestResponse { + pub success: bool, + pub message: String, + pub pool_name: String, + pub request_id: String, + pub action: String, +} diff --git a/src/console/openapi.rs b/src/console/openapi.rs index 3e0e70e..649af5d 100644 --- a/src/console/openapi.rs +++ b/src/console/openapi.rs @@ -34,8 +34,9 @@ use crate::console::models::pod::{ PodListItem, PodListResponse, PodStatus, RestartPodRequest, VolumeInfo, }; use crate::console::models::pool::{ - AddPoolRequest, AddPoolResponse, DeletePoolResponse, PoolDetails, PoolListResponse, - ResourceList, ResourceRequirements, + AddPoolRequest, AddPoolResponse, CancelPoolDecommissionRequest, DeletePoolResponse, + PoolDecommissionRequestResponse, PoolDetails, PoolListResponse, ResourceList, + ResourceRequirements, StartPoolDecommissionRequest, }; use crate::console::models::tenant::{ CreatePoolRequest, CreateTenantRequest, DeleteTenantResponse, EnvVar, LoggingConfig, PoolInfo, @@ -67,6 +68,8 @@ use crate::console::models::topology::{ api_list_pools, api_add_pool, api_delete_pool, + api_start_pool_decommission, + api_cancel_pool_decommission, api_list_pods, api_get_pod, api_delete_pod, @@ -111,6 +114,9 @@ use crate::console::models::topology::{ ResourceList, AddPoolResponse, DeletePoolResponse, + StartPoolDecommissionRequest, + CancelPoolDecommissionRequest, + PoolDecommissionRequestResponse, PodListItem, PodListResponse, PodDetails, @@ -270,6 +276,58 @@ fn api_delete_pool() -> Json { unimplemented!("Documentation only") } +#[utoipa::path( + post, + path = "/api/v1/namespaces/{namespace}/tenants/{name}/pools/{pool}/decommission", + params( + ("namespace" = String, Path), + ("name" = String, Path), + ("pool" = String, Path) + ), + request_body = StartPoolDecommissionRequest, + responses( + (status = 200, body = PoolDecommissionRequestResponse), + (status = 400, body = ConsoleErrorResponse), + (status = 401, body = ConsoleErrorResponse), + (status = 403, body = ConsoleErrorResponse), + (status = 404, body = ConsoleErrorResponse), + (status = 409, body = ConsoleErrorResponse), + (status = 500, body = ConsoleErrorResponse) + ), + tag = "pools" +)] +fn api_start_pool_decommission( + _body: Json, +) -> Json { + unimplemented!("Documentation only") +} + +#[utoipa::path( + post, + path = "/api/v1/namespaces/{namespace}/tenants/{name}/pools/{pool}/decommission/cancel", + params( + ("namespace" = String, Path), + ("name" = String, Path), + ("pool" = String, Path) + ), + request_body = CancelPoolDecommissionRequest, + responses( + (status = 200, body = PoolDecommissionRequestResponse), + (status = 400, body = ConsoleErrorResponse), + (status = 401, body = ConsoleErrorResponse), + (status = 403, body = ConsoleErrorResponse), + (status = 404, body = ConsoleErrorResponse), + (status = 409, body = ConsoleErrorResponse), + (status = 500, body = ConsoleErrorResponse) + ), + tag = "pools" +)] +fn api_cancel_pool_decommission( + _body: Json, +) -> Json { + unimplemented!("Documentation only") +} + // --- Pods --- #[utoipa::path(get, path = "/api/v1/namespaces/{namespace}/tenants/{name}/pods", params(("namespace" = String, Path), ("name" = String, Path)), responses((status = 200, body = PodListResponse)), tag = "pods")] fn api_list_pods() -> Json { diff --git a/src/console/routes/mod.rs b/src/console/routes/mod.rs index 38facc2..5536cdb 100755 --- a/src/console/routes/mod.rs +++ b/src/console/routes/mod.rs @@ -97,6 +97,14 @@ pub fn pool_routes() -> Router { "/namespaces/:namespace/tenants/:name/pools/:pool", delete(handlers::pools::delete_pool), ) + .route( + "/namespaces/:namespace/tenants/:name/pools/:pool/decommission", + post(handlers::pools::start_pool_decommission), + ) + .route( + "/namespaces/:namespace/tenants/:name/pools/:pool/decommission/cancel", + post(handlers::pools::cancel_pool_decommission), + ) } /// Pod list, detail, delete, restart, logs diff --git a/src/context.rs b/src/context.rs index e76e5d6..a3ccff2 100755 --- a/src/context.rs +++ b/src/context.rs @@ -241,14 +241,26 @@ impl Context { } pub async fn delete(&self, name: &str, namespace: &str) -> Result<(), Error> + where + T: Resource + Clone + DeserializeOwned + Debug, + ::DynamicType: Default, + { + self.delete_with_params::(name, namespace, &DeleteParams::default()) + .await + } + + pub async fn delete_with_params( + &self, + name: &str, + namespace: &str, + params: &DeleteParams, + ) -> Result<(), Error> where T: Resource + Clone + DeserializeOwned + Debug, ::DynamicType: Default, { let api: Api = Api::namespaced(self.client.clone(), namespace); - api.delete(name, &DeleteParams::default()) - .context(KubeSnafu) - .await?; + api.delete(name, params).context(KubeSnafu).await?; Ok(()) } diff --git a/src/reconcile.rs b/src/reconcile.rs index 8d206a7..ca0c805 100755 --- a/src/reconcile.rs +++ b/src/reconcile.rs @@ -28,13 +28,15 @@ use std::time::Duration; use tracing::{debug, error, info, warn}; mod phases; +mod pool_lifecycle; mod tls; use phases::{ - finalize_tenant_status, maybe_cleanup_terminating_pods, reconcile_pool_statefulsets, - reconcile_rbac_resources, reconcile_services, validate_no_pool_rename, - validate_tenant_prerequisites, + cleanup_removed_decommissioned_pool_statefulsets, finalize_tenant_status, + maybe_cleanup_terminating_pods, reconcile_pool_statefulsets, reconcile_rbac_resources, + reconcile_services, validate_no_pool_rename, validate_tenant_prerequisites, }; +use pool_lifecycle::reconcile_pool_lifecycle; #[derive(Snafu, Debug)] pub enum Error { @@ -77,9 +79,28 @@ pub async fn reconcile_rustfs(tenant: Arc, ctx: Arc) -> Result< reconcile_services(&ctx, &latest_tenant, &ns, &tls_plan).await?; - validate_no_pool_rename(&ctx, &latest_tenant, &ns).await?; - - let summary = reconcile_pool_statefulsets(&ctx, &latest_tenant, &ns, &tls_plan).await?; + let removed_pool_cleanup = + cleanup_removed_decommissioned_pool_statefulsets(&ctx, &latest_tenant, &ns).await?; + + validate_no_pool_rename( + &ctx, + &latest_tenant, + &ns, + &removed_pool_cleanup.allowed_removed_pool_names, + ) + .await?; + + let lifecycle_decisions = reconcile_pool_lifecycle(&ctx, &latest_tenant, &ns).await?; + + let summary = reconcile_pool_statefulsets( + &ctx, + &latest_tenant, + &ns, + &tls_plan, + &lifecycle_decisions, + &removed_pool_cleanup, + ) + .await?; finalize_tenant_status(&ctx, &latest_tenant, summary, tls_plan).await } diff --git a/src/reconcile/phases.rs b/src/reconcile/phases.rs index a051f0a..2a3f250 100644 --- a/src/reconcile/phases.rs +++ b/src/reconcile/phases.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use super::pool_lifecycle::{PoolLifecycleDecision, PoolLifecycleDecisions}; use super::{ Error, cleanup_stuck_terminating_pods_on_down_nodes, context, context_result, patch_status_and_record, patch_status_error, statefulset_owned_by_tenant, types_result, @@ -19,13 +20,15 @@ use super::{ use crate::context::Context; use crate::status::{StatusBuilder, StatusError}; use crate::types; +use crate::types::v1alpha1::status::pool::PoolLifecycleState; use crate::types::v1alpha1::status::{ConditionType, Reason}; use crate::types::v1alpha1::tenant::Tenant; use crate::types::v1alpha1::tls::TlsPlan; use kube::ResourceExt; -use kube::api::ListParams; +use kube::api::{DeleteParams, ListParams, PropagationPolicy}; use kube::runtime::controller::Action; use kube::runtime::events::EventType; +use std::collections::HashSet; use std::time::Duration; use tracing::{debug, error, warn}; @@ -34,10 +37,32 @@ pub(super) struct PoolReconcileSummary { pool_statuses: Vec, any_updating: bool, any_degraded: bool, + any_lifecycle_reconciling: bool, + any_removed_pool_cleanup_reconciling: bool, + any_lifecycle_decommissioned: bool, + any_lifecycle_failed: bool, + any_lifecycle_canceled: bool, + lifecycle_requeue_after: Option, total_replicas: i32, ready_replicas: i32, } +const REMOVED_POOL_CLEANUP_REQUEUE_INTERVAL: Duration = Duration::from_secs(10); + +#[derive(Default)] +pub(super) struct RemovedDecommissionedPoolCleanup { + pub(super) allowed_removed_pool_names: HashSet, + pub(super) any_reconciling: bool, + pub(super) requeue_after: Option, +} + +impl RemovedDecommissionedPoolCleanup { + fn mark_reconciling(&mut self) { + self.any_reconciling = true; + self.requeue_after = Some(REMOVED_POOL_CLEANUP_REQUEUE_INTERVAL); + } +} + pub(super) async fn validate_tenant_prerequisites( ctx: &Context, tenant: &Tenant, @@ -188,10 +213,116 @@ pub(super) async fn reconcile_services( Ok(()) } +pub(super) async fn cleanup_removed_decommissioned_pool_statefulsets( + ctx: &Context, + tenant: &Tenant, + namespace: &str, +) -> Result { + let owned_statefulsets = context_result( + ctx.list_with_params::( + namespace, + &ListParams::default().labels(&format!("rustfs.tenant={}", tenant.name())), + ) + .await, + ctx, + tenant, + ) + .await?; + + let current_pool_names: HashSet<_> = + tenant.spec.pools.iter().map(|p| p.name.as_str()).collect(); + let tenant_prefix = format!("{}-", tenant.name()); + let mut cleanup = RemovedDecommissionedPoolCleanup::default(); + + for ss in owned_statefulsets + .iter() + .filter(|ss| statefulset_owned_by_tenant(ss, tenant)) + { + let Some(ss_name) = ss.metadata.name.as_deref() else { + continue; + }; + let Some(pool_name) = ss_name.strip_prefix(&tenant_prefix) else { + continue; + }; + if current_pool_names.contains(pool_name) { + continue; + } + if !removed_pool_is_decommissioned(tenant, pool_name, ss_name) { + continue; + } + + cleanup + .allowed_removed_pool_names + .insert(pool_name.to_string()); + if ss.metadata.deletion_timestamp.is_some() { + cleanup.mark_reconciling(); + continue; + } + + let delete_params = DeleteParams { + propagation_policy: Some(PropagationPolicy::Background), + ..DeleteParams::default() + }; + debug!( + tenant = %tenant.name(), + namespace = %namespace, + pool = %pool_name, + statefulset = %ss_name, + "deleting StatefulSet for removed decommissioned pool" + ); + let delete_requested = match ctx + .delete_with_params::( + ss_name, + namespace, + &delete_params, + ) + .await + { + Ok(()) => true, + Err(error) if is_not_found_context_error(&error) => false, + Err(error) => { + let status_error = StatusError::from_context_error(&error); + patch_status_error(ctx, tenant, &status_error).await; + return Err(error.into()); + } + }; + + if delete_requested { + cleanup.mark_reconciling(); + let _ = ctx + .record( + tenant, + EventType::Normal, + "PoolRemoved", + &format!( + "Deleting StatefulSet '{}' after decommissioned pool was removed from spec", + ss_name + ), + ) + .await; + } + } + + Ok(cleanup) +} + +fn removed_pool_is_decommissioned(tenant: &Tenant, pool_name: &str, ss_name: &str) -> bool { + tenant.status.as_ref().is_some_and(|status| { + status.pools.iter().any(|pool_status| { + (pool_status.name.as_deref() == Some(pool_name) || pool_status.ss_name == ss_name) + && matches!( + pool_status.lifecycle_state, + Some(PoolLifecycleState::Decommissioned) + ) + }) + }) +} + pub(super) async fn validate_no_pool_rename( ctx: &Context, tenant: &Tenant, namespace: &str, + allowed_removed_pool_names: &HashSet, ) -> Result<(), Error> { let owned_statefulsets = context_result( ctx.list_with_params::( @@ -224,6 +355,7 @@ pub(super) async fn validate_no_pool_rename( let mut removed_pool_names: Vec<_> = existing_pool_names .iter() .filter(|pool_name| !current_pool_names.contains(pool_name.as_str())) + .filter(|pool_name| !allowed_removed_pool_names.contains(*pool_name)) .cloned() .collect(); removed_pool_names.sort_unstable(); @@ -275,11 +407,38 @@ pub(super) async fn reconcile_pool_statefulsets( tenant: &Tenant, namespace: &str, tls_plan: &TlsPlan, + lifecycle_decisions: &PoolLifecycleDecisions, + removed_pool_cleanup: &RemovedDecommissionedPoolCleanup, ) -> Result { - let mut summary = PoolReconcileSummary::default(); + let mut summary = PoolReconcileSummary { + any_lifecycle_reconciling: lifecycle_decisions.any_reconciling, + any_removed_pool_cleanup_reconciling: removed_pool_cleanup.any_reconciling, + any_lifecycle_failed: lifecycle_decisions.any_failed, + any_lifecycle_canceled: lifecycle_decisions.any_canceled, + lifecycle_requeue_after: earliest_requeue_after( + lifecycle_decisions.requeue_after, + removed_pool_cleanup.requeue_after, + ), + ..Default::default() + }; for pool in &tenant.spec.pools { let ss_name = format!("{}-{}", tenant.name(), pool.name); + let lifecycle_decision = lifecycle_decisions.decision_for(&pool.name); + if lifecycle_decision.is_some_and(|decision| decision.skip_workload_reconcile) { + reconcile_lifecycle_gated_pool_statefulset( + ctx, + tenant, + namespace, + pool, + &ss_name, + lifecycle_decision, + &mut summary, + ) + .await?; + continue; + } + match ctx .get::(&ss_name, namespace) .await @@ -320,6 +479,15 @@ pub(super) async fn reconcile_pool_statefulsets( Ok(summary) } +fn earliest_requeue_after(left: Option, right: Option) -> Option { + match (left, right) { + (Some(left), Some(right)) => Some(left.min(right)), + (Some(left), None) => Some(left), + (None, Some(right)) => Some(right), + (None, None) => None, + } +} + fn is_not_found_context_error(error: &context::Error) -> bool { matches!( error, @@ -329,6 +497,76 @@ fn is_not_found_context_error(error: &context::Error) -> bool { ) } +async fn reconcile_lifecycle_gated_pool_statefulset( + ctx: &Context, + tenant: &Tenant, + namespace: &str, + pool: &crate::types::v1alpha1::pool::Pool, + ss_name: &str, + lifecycle_decision: Option<&PoolLifecycleDecision>, + summary: &mut PoolReconcileSummary, +) -> Result<(), Error> { + debug!( + tenant = %tenant.name(), + namespace = %namespace, + pool = %pool.name, + "skipping normal StatefulSet reconcile because pool lifecycle gate is active" + ); + + let mut pool_status = match ctx + .get::(ss_name, namespace) + .await + { + Ok(ss) => tenant.build_pool_status(&pool.name, &ss), + Err(error) if is_not_found_context_error(&error) => missing_pool_status(tenant, &pool.name), + Err(error) => { + let status_error = StatusError::from_context_error(&error); + patch_status_error(ctx, tenant, &status_error).await; + return Err(error.into()); + } + }; + + if let Some(decision) = lifecycle_decision { + apply_lifecycle_decision(&mut pool_status, decision); + } + + update_pool_summary(summary, pool_status); + + Ok(()) +} + +fn missing_pool_status( + tenant: &Tenant, + pool_name: &str, +) -> crate::types::v1alpha1::status::pool::Pool { + crate::types::v1alpha1::status::pool::Pool { + name: Some(pool_name.to_string()), + ss_name: format!("{}-{}", tenant.name(), pool_name), + state: crate::types::v1alpha1::status::pool::PoolState::NotCreated, + lifecycle_state: Some(PoolLifecycleState::Active), + workload_state: Some(crate::types::v1alpha1::status::pool::PoolState::NotCreated), + decommission: None, + replicas: None, + ready_replicas: None, + current_replicas: None, + updated_replicas: None, + current_revision: None, + update_revision: None, + last_update_time: Some( + chrono::Utc::now().to_rfc3339_opts(chrono::SecondsFormat::Secs, true), + ), + } +} + +fn apply_lifecycle_decision( + pool_status: &mut crate::types::v1alpha1::status::pool::Pool, + decision: &PoolLifecycleDecision, +) { + pool_status.lifecycle_state = Some(decision.state.clone()); + pool_status.workload_state = Some(pool_status.state.clone()); + pool_status.decommission = decision.decommission.clone(); +} + async fn reconcile_existing_pool_statefulset( ctx: &Context, tenant: &Tenant, @@ -457,6 +695,22 @@ fn update_pool_summary( _ => {} } + match pool_status.lifecycle_state { + Some(PoolLifecycleState::Decommissioning) => summary.any_lifecycle_reconciling = true, + Some(PoolLifecycleState::Decommissioned) => summary.any_lifecycle_decommissioned = true, + Some(PoolLifecycleState::DecommissionFailed) => summary.any_lifecycle_failed = true, + Some(PoolLifecycleState::DecommissionCanceled) => summary.any_lifecycle_canceled = true, + _ => {} + } + + if matches!( + pool_status.lifecycle_state, + Some(PoolLifecycleState::Decommissioned) + ) { + summary.pool_statuses.push(pool_status); + return; + } + if let Some(replicas) = pool_status.replicas { summary.total_replicas += replicas; } @@ -479,7 +733,65 @@ pub(super) async fn finalize_tenant_status( builder.set_tls_status(tls_status); } - let (event_condition, event_reason, event_type, event_message) = if summary.any_degraded { + let (event_condition, event_reason, event_type, event_message) = if summary.any_lifecycle_failed + { + builder.finish_degraded( + Reason::PoolDecommissionFailed, + ConditionType::PoolsReady, + "One or more pool decommission operations failed".to_string(), + ); + ( + ConditionType::PoolsReady, + Reason::PoolDecommissionFailed, + EventType::Warning, + "One or more pool decommission operations failed".to_string(), + ) + } else if summary.any_lifecycle_canceled { + builder.finish_degraded( + Reason::PoolDecommissionCanceled, + ConditionType::PoolsReady, + "One or more pool decommission operations were canceled".to_string(), + ); + ( + ConditionType::PoolsReady, + Reason::PoolDecommissionCanceled, + EventType::Warning, + "One or more pool decommission operations were canceled".to_string(), + ) + } else if summary.any_removed_pool_cleanup_reconciling { + builder.finish_reconciling( + Reason::PoolDecommissioning, + "Decommissioned pool cleanup is in progress".to_string(), + ); + ( + ConditionType::PoolsReady, + Reason::PoolDecommissioning, + EventType::Normal, + "Decommissioned pool cleanup is in progress".to_string(), + ) + } else if summary.any_lifecycle_reconciling { + builder.finish_reconciling( + Reason::PoolDecommissioning, + "Pool decommission is in progress".to_string(), + ); + ( + ConditionType::PoolsReady, + Reason::PoolDecommissioning, + EventType::Normal, + "Pool decommission is in progress".to_string(), + ) + } else if summary.any_lifecycle_decommissioned { + builder.finish_reconciling( + Reason::PoolDecommissioned, + "Pool decommission completed; remove the pool from spec to finish cleanup".to_string(), + ); + ( + ConditionType::PoolsReady, + Reason::PoolDecommissioned, + EventType::Normal, + "Pool decommission completed; remove the pool from spec to finish cleanup".to_string(), + ) + } else if summary.any_degraded { builder.finish_degraded( Reason::PoolDegraded, ConditionType::PoolsReady, @@ -545,10 +857,46 @@ pub(super) async fn finalize_tenant_status( ) .await?; - if summary.any_updating { + if let Some(requeue_after) = summary.lifecycle_requeue_after { + debug!( + seconds = requeue_after.as_secs(), + "Pool lifecycle is active, requeuing" + ); + Ok(Action::requeue(requeue_after)) + } else if summary.any_updating { debug!("Pools are updating, requeuing in 10 seconds"); Ok(Action::requeue(Duration::from_secs(10))) } else { Ok(Action::await_change()) } } + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn removed_pool_cleanup_marks_reconciling_and_requeues() { + let mut cleanup = RemovedDecommissionedPoolCleanup::default(); + + cleanup.mark_reconciling(); + + assert!(cleanup.any_reconciling); + assert_eq!( + cleanup.requeue_after, + Some(REMOVED_POOL_CLEANUP_REQUEUE_INTERVAL) + ); + } + + #[test] + fn earliest_requeue_after_prefers_shorter_duration() { + assert_eq!( + earliest_requeue_after(Some(Duration::from_secs(30)), Some(Duration::from_secs(10))), + Some(Duration::from_secs(10)) + ); + assert_eq!( + earliest_requeue_after(None, Some(Duration::from_secs(10))), + Some(Duration::from_secs(10)) + ); + } +} diff --git a/src/reconcile/pool_lifecycle.rs b/src/reconcile/pool_lifecycle.rs new file mode 100644 index 0000000..fe79602 --- /dev/null +++ b/src/reconcile/pool_lifecycle.rs @@ -0,0 +1,1236 @@ +// Copyright 2025 RustFS Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::BTreeMap; +use std::time::Duration; + +use k8s_openapi::api::apps::v1::StatefulSet; +use kube::api::{DeleteParams, PropagationPolicy}; +use kube::runtime::events::EventType; +use sha2::{Digest, Sha256}; +use tracing::warn; + +use super::{Error, context}; +use crate::context::Context; +use crate::sts::rustfs_client::{ + RustfsAdminClient, RustfsClientError, RustfsPoolDecommissionInfo, RustfsPoolListItem, + RustfsPoolStatus, +}; +use crate::types::v1alpha1::pool::Pool; +use crate::types::v1alpha1::pool_lifecycle::{DecommissionAction, DecommissionRequest}; +use crate::types::v1alpha1::status::pool::{ + PoolDecommissionCleanupState, PoolDecommissionCleanupStatus, PoolDecommissionLastError, + PoolDecommissionPhase, PoolDecommissionProgress, PoolDecommissionStatus, PoolLifecycleState, +}; +use crate::types::v1alpha1::tenant::Tenant; + +const POLL_INTERVAL: Duration = Duration::from_secs(30); + +#[derive(Default)] +pub(super) struct PoolLifecycleDecisions { + decisions: BTreeMap, + pub(super) any_reconciling: bool, + pub(super) any_failed: bool, + pub(super) any_canceled: bool, + pub(super) requeue_after: Option, +} + +#[derive(Clone, Debug)] +pub(super) struct PoolLifecycleDecision { + pub(super) state: PoolLifecycleState, + pub(super) decommission: Option, + pub(super) skip_workload_reconcile: bool, +} + +struct MatchedRustfsPool { + item: RustfsPoolListItem, + expected_cmd_line: String, + expected_endpoint_set_hash: String, +} + +enum PoolMappingError { + ListFailed(String), + NotFound(String), + Ambiguous(String), + TenantNamespace(String), +} + +impl PoolMappingError { + fn reason(&self) -> &'static str { + match self { + Self::ListFailed(_) => "RustfsPoolListFailed", + Self::NotFound(_) => "RustfsPoolMappingFailed", + Self::Ambiguous(_) => "RustfsPoolMappingAmbiguous", + Self::TenantNamespace(_) => "TenantNamespaceMissing", + } + } + + fn message(&self) -> &str { + match self { + Self::ListFailed(message) + | Self::NotFound(message) + | Self::Ambiguous(message) + | Self::TenantNamespace(message) => message, + } + } + + fn is_retriable(&self) -> bool { + matches!(self, Self::ListFailed(_)) + } +} + +impl PoolLifecycleDecisions { + pub(super) fn decision_for(&self, pool_name: &str) -> Option<&PoolLifecycleDecision> { + self.decisions.get(pool_name) + } + + fn insert(&mut self, pool_name: String, decision: PoolLifecycleDecision) { + match decision.state { + PoolLifecycleState::Decommissioning => { + self.any_reconciling = true; + self.requeue_after = Some(POLL_INTERVAL); + } + PoolLifecycleState::Decommissioned + if decision + .decommission + .as_ref() + .is_some_and(decommissioned_cleanup_needs_requeue) => + { + self.any_reconciling = true; + self.requeue_after = Some(POLL_INTERVAL); + } + PoolLifecycleState::DecommissionFailed => self.any_failed = true, + PoolLifecycleState::DecommissionCanceled => self.any_canceled = true, + _ => {} + } + + self.decisions.insert(pool_name, decision); + } +} + +pub(super) async fn reconcile_pool_lifecycle( + ctx: &Context, + tenant: &Tenant, + namespace: &str, +) -> Result { + let mut decisions = PoolLifecycleDecisions::default(); + + for pool in &tenant.spec.pools { + let request = tenant + .spec + .pool_lifecycle + .as_ref() + .and_then(|lifecycle| lifecycle.request_for_pool(&pool.name)); + let existing = existing_decommission_status(tenant, &pool.name); + let existing_state = existing_lifecycle_state(tenant, &pool.name); + + let should_continue = matches!( + existing_state, + Some( + PoolLifecycleState::Decommissioning + | PoolLifecycleState::Decommissioned + | PoolLifecycleState::DecommissionCanceled + | PoolLifecycleState::DecommissionFailed + ) + ); + + if request.is_none() && !should_continue { + continue; + } + + let decision = reconcile_single_pool_lifecycle( + ctx, + tenant, + namespace, + pool, + request, + existing_state, + existing, + ) + .await; + + decisions.insert(pool.name.clone(), decision); + } + + Ok(decisions) +} + +async fn reconcile_single_pool_lifecycle( + ctx: &Context, + tenant: &Tenant, + namespace: &str, + pool: &Pool, + request: Option<&DecommissionRequest>, + existing_state: Option, + existing: Option, +) -> PoolLifecycleDecision { + if matches!(existing_state, Some(PoolLifecycleState::Decommissioned)) { + let status = existing.unwrap_or_else(|| PoolDecommissionStatus { + phase: Some(PoolDecommissionPhase::Complete), + ..empty_decommission_status() + }); + + if cleanup_already_authorized_or_complete(&status) { + return cleanup_decommissioned_pool(ctx, tenant, namespace, pool, status).await; + } + + let client = match rustfs_admin_client(ctx, tenant).await { + Ok(client) => client, + Err(error) => { + return cleanup_retriable_decision( + status, + "RustfsAdminClientError", + &error.to_string(), + ); + } + }; + + let status = + match verify_decommissioned_pool_for_cleanup(&client, tenant, namespace, pool, &status) + .await + { + Ok(status) => status, + Err(decision) => return decision, + }; + + return cleanup_decommissioned_pool(ctx, tenant, namespace, pool, status).await; + } + + let Some(request) = request else { + return terminal_decision_from_existing(existing_state, existing); + }; + + if request.request_id.trim().is_empty() { + return failed_decision( + Some(request.request_id.clone()), + "InvalidRequest", + "pool decommission requestID must not be empty", + ); + } + + if request.action == DecommissionAction::Start && tenant.spec.pools.len() <= 1 { + return failed_decision( + Some(request.request_id.clone()), + "LastPoolBlocked", + "cannot decommission the last pool in a tenant", + ); + } + + let client = match rustfs_admin_client(ctx, tenant).await { + Ok(client) => client, + Err(error) => { + return retriable_decision( + Some(request.request_id.clone()), + "RustfsAdminClientError", + &error.to_string(), + ); + } + }; + + let matched_pool = match find_rustfs_pool(&client, tenant, namespace, pool).await { + Ok(pool_item) => pool_item, + Err(error) if error.is_retriable() => { + return retriable_decision( + Some(request.request_id.clone()), + error.reason(), + error.message(), + ); + } + Err(error) => { + return failed_decision( + Some(request.request_id.clone()), + error.reason(), + error.message(), + ); + } + }; + let pool_id = matched_pool.item.id.to_string(); + + if cancel_without_decommission_info_is_noop(request, matched_pool.item.decommission.as_ref()) { + return active_lifecycle_decision(); + } + + if request.action == DecommissionAction::Start + && match should_start_decommission( + existing_state.as_ref(), + existing.as_ref(), + &matched_pool.item, + request, + ) { + Ok(should_start) => should_start, + Err(message) => { + return failed_decision( + Some(request.request_id.clone()), + "RustfsDecommissionAlreadyRunning", + &message, + ); + } + } + && let Err(error) = client.start_pool_decommission_by_id(&pool_id).await + { + return retriable_decision( + Some(request.request_id.clone()), + "RustfsDecommissionStartFailed", + &error.to_string(), + ); + } + + if request.action == DecommissionAction::Cancel + && !matches!( + existing_state, + Some(PoolLifecycleState::DecommissionCanceled) + ) + && let Err(error) = client.cancel_pool_decommission_by_id(&pool_id).await + { + return retriable_decision( + Some(request.request_id.clone()), + "RustfsDecommissionCancelFailed", + &error.to_string(), + ); + } + + let rustfs_status = match client.pool_status_by_id(&pool_id).await { + Ok(status) => status, + Err(error) => { + return retriable_decision( + Some(request.request_id.clone()), + "RustfsDecommissionStatusFailed", + &error.to_string(), + ); + } + }; + + if cancel_without_decommission_info_is_noop(request, rustfs_status.decommission.as_ref()) { + return active_lifecycle_decision(); + } + + let status = match decommission_status_from_rustfs( + request, + &rustfs_status, + &matched_pool.expected_cmd_line, + &matched_pool.expected_endpoint_set_hash, + ) { + Ok(status) => status, + Err(message) => { + return failed_decision( + Some(request.request_id.clone()), + "RustfsPoolIdentityMismatch", + &message, + ); + } + }; + match lifecycle_state_from_status(&status) { + PoolLifecycleState::Decommissioned => { + cleanup_decommissioned_pool(ctx, tenant, namespace, pool, status).await + } + state => PoolLifecycleDecision { + state, + decommission: Some(status), + skip_workload_reconcile: true, + }, + } +} + +async fn rustfs_admin_client( + ctx: &Context, + tenant: &Tenant, +) -> Result { + let credentials = RustfsAdminClient::load_tenant_credentials(&ctx.client, tenant).await?; + if tenant.spec.tls.as_ref().is_some_and(|tls| tls.is_enabled()) { + RustfsAdminClient::from_tls_tenant_for_sts(&ctx.client, tenant, credentials).await + } else { + RustfsAdminClient::from_tenant(tenant, credentials) + } +} + +async fn find_rustfs_pool( + client: &RustfsAdminClient, + tenant: &Tenant, + namespace: &str, + pool: &Pool, +) -> Result { + let expected_cmd_line = expected_pool_cmd_line(tenant, namespace, pool) + .map_err(PoolMappingError::TenantNamespace)?; + let expected_endpoint_set_hash = endpoint_set_hash(&expected_cmd_line); + let pools = client + .list_pools() + .await + .map_err(|error| PoolMappingError::ListFailed(error.to_string()))?; + + let mut matches = pools + .into_iter() + .filter(|item| same_cmd_line(&item.cmd_line, &expected_cmd_line)) + .collect::>(); + + if matches.len() > 1 { + return Err(PoolMappingError::Ambiguous(format!( + "RustFS admin pool list returned {} pools matching cmdline '{}'", + matches.len(), + expected_cmd_line + ))); + } + + matches + .pop() + .map(|item| MatchedRustfsPool { + item, + expected_cmd_line: expected_cmd_line.clone(), + expected_endpoint_set_hash, + }) + .ok_or_else(|| { + PoolMappingError::NotFound(format!( + "RustFS admin pool list did not contain expected cmdline '{}'", + expected_cmd_line + )) + }) +} + +fn expected_pool_cmd_line(tenant: &Tenant, namespace: &str, pool: &Pool) -> Result { + let scheme = if tenant + .spec + .tls + .as_ref() + .is_some_and(|tls| tls.enable_internode_https) + { + "https" + } else { + "http" + }; + if pool.servers <= 0 || pool.persistence.volumes_per_server <= 0 { + return Err(format!( + "pool '{}' has invalid servers or volumesPerServer", + pool.name + )); + } + + Ok(tenant.rustfs_pool_volume_spec(pool, scheme, namespace)) +} + +fn same_cmd_line(left: &str, right: &str) -> bool { + left.trim() == right.trim() +} + +fn validate_rustfs_pool_identity( + status: &RustfsPoolStatus, + expected_cmd_line: &str, + expected_endpoint_set_hash: &str, +) -> Result<(), String> { + if !same_cmd_line(&status.cmd_line, expected_cmd_line) { + return Err(format!( + "RustFS status cmdline '{}' does not match expected cmdline '{}'", + status.cmd_line, expected_cmd_line + )); + } + + let observed_hash = endpoint_set_hash(&status.cmd_line); + if observed_hash != expected_endpoint_set_hash { + return Err(format!( + "RustFS endpoint set hash '{}' does not match expected hash '{}'", + observed_hash, expected_endpoint_set_hash + )); + } + + Ok(()) +} + +async fn verify_decommissioned_pool_for_cleanup( + client: &RustfsAdminClient, + tenant: &Tenant, + namespace: &str, + pool: &Pool, + existing: &PoolDecommissionStatus, +) -> Result { + let matched_pool = match find_rustfs_pool(client, tenant, namespace, pool).await { + Ok(matched_pool) => matched_pool, + Err(error) if error.is_retriable() => { + return Err(cleanup_retriable_decision( + existing.clone(), + error.reason(), + error.message(), + )); + } + Err(error) => { + return Err(failed_decision( + existing.request_id.clone(), + error.reason(), + error.message(), + )); + } + }; + let pool_id = matched_pool.item.id.to_string(); + + let Some(existing_pool_id) = existing.rustfs_pool_id.as_deref() else { + return Err(failed_decision( + existing.request_id.clone(), + "RustfsPoolIdentityMissing", + "recorded decommission status is missing rustfsPoolID; refusing cleanup", + )); + }; + if existing_pool_id != pool_id { + let message = format!( + "recorded RustFS pool id '{}' no longer matches observed pool id '{}'", + existing_pool_id, pool_id + ); + return Err(failed_decision( + existing.request_id.clone(), + "RustfsPoolIdentityMismatch", + &message, + )); + } + + let Some(existing_hash) = existing.endpoint_set_hash.as_deref() else { + return Err(failed_decision( + existing.request_id.clone(), + "RustfsPoolIdentityMissing", + "recorded decommission status is missing endpointSetHash; refusing cleanup", + )); + }; + if existing_hash != matched_pool.expected_endpoint_set_hash { + return Err(failed_decision( + existing.request_id.clone(), + "RustfsPoolIdentityMismatch", + "recorded endpoint set hash no longer matches the expected pool cmdline", + )); + } + + let rustfs_status = match client.pool_status_by_id(&pool_id).await { + Ok(status) => status, + Err(error) => { + return Err(cleanup_retriable_decision( + existing.clone(), + "RustfsDecommissionStatusFailed", + &error.to_string(), + )); + } + }; + + let request_id = existing + .request_id + .clone() + .unwrap_or_else(|| "recorded-decommission".to_string()); + let request = DecommissionRequest { + pool_name: pool.name.clone(), + request_id, + action: DecommissionAction::Start, + requested_at: None, + cancel_requested_at: None, + reason: None, + }; + + let status = decommission_status_from_rustfs( + &request, + &rustfs_status, + &matched_pool.expected_cmd_line, + &matched_pool.expected_endpoint_set_hash, + ) + .map_err(|message| { + failed_decision( + existing.request_id.clone(), + "RustfsPoolIdentityMismatch", + &message, + ) + })?; + + if !matches!(status.phase, Some(PoolDecommissionPhase::Complete)) { + return Err(failed_decision( + existing.request_id.clone(), + "RustfsDecommissionNotComplete", + "RustFS no longer reports the pool decommission as complete; refusing cleanup", + )); + } + + Ok(status) +} + +fn should_start_decommission( + existing_state: Option<&PoolLifecycleState>, + existing: Option<&PoolDecommissionStatus>, + pool_item: &RustfsPoolListItem, + request: &DecommissionRequest, +) -> Result { + if pool_item.status == "running" { + if existing + .and_then(|status| status.request_id.as_deref()) + .is_some_and(|request_id| request_id == request.request_id) + || decommission_started_after_request(pool_item, request) + { + return Ok(false); + } + + return Err(format!( + "RustFS already reports pool '{}' as decommissioning before request '{}' was observed", + pool_item.cmd_line, request.request_id + )); + } + + if matches!(existing_state, Some(PoolLifecycleState::Decommissioning)) + && existing + .and_then(|status| status.request_id.as_deref()) + .is_some_and(|request_id| request_id == request.request_id) + { + return Ok(false); + } + + Ok(true) +} + +fn decommission_started_after_request( + pool_item: &RustfsPoolListItem, + request: &DecommissionRequest, +) -> bool { + let Some(requested_at) = request.requested_at.as_deref() else { + return false; + }; + let Some(started_at) = pool_item + .decommission + .as_ref() + .and_then(|info| info.start_time.as_deref()) + else { + return false; + }; + + let Ok(requested_at) = chrono::DateTime::parse_from_rfc3339(requested_at) else { + return false; + }; + let Ok(started_at) = chrono::DateTime::parse_from_rfc3339(started_at) else { + return false; + }; + + started_at >= requested_at +} + +fn decommission_status_from_rustfs( + request: &DecommissionRequest, + status: &RustfsPoolStatus, + expected_cmd_line: &str, + expected_endpoint_set_hash: &str, +) -> Result { + validate_rustfs_pool_identity(status, expected_cmd_line, expected_endpoint_set_hash)?; + let info = status.decommission.as_ref(); + Ok(PoolDecommissionStatus { + request_id: Some(request.request_id.clone()), + rustfs_pool_id: Some(status.id.to_string()), + endpoint_set_hash: Some(endpoint_set_hash(&status.cmd_line)), + phase: Some(decommission_phase(info)), + started_at: info.and_then(|info| info.start_time.clone()), + last_poll_time: Some(now_rfc3339()), + completed_at: completed_at(info), + progress: Some(decommission_progress(info)), + cleanup: None, + last_error: None, + }) +} + +fn lifecycle_state_from_status(status: &PoolDecommissionStatus) -> PoolLifecycleState { + match status.phase { + Some(PoolDecommissionPhase::Complete) => PoolLifecycleState::Decommissioned, + Some(PoolDecommissionPhase::Canceled) => PoolLifecycleState::DecommissionCanceled, + Some(PoolDecommissionPhase::Failed) => PoolLifecycleState::DecommissionFailed, + _ => PoolLifecycleState::Decommissioning, + } +} + +fn decommission_phase(info: Option<&RustfsPoolDecommissionInfo>) -> PoolDecommissionPhase { + let Some(info) = info else { + return PoolDecommissionPhase::Running; + }; + + if info.canceled.unwrap_or(false) { + PoolDecommissionPhase::Canceled + } else if info.failed.unwrap_or(false) { + PoolDecommissionPhase::Failed + } else if info.complete.unwrap_or(false) { + PoolDecommissionPhase::Complete + } else { + PoolDecommissionPhase::Running + } +} + +fn completed_at(info: Option<&RustfsPoolDecommissionInfo>) -> Option { + let info = info?; + if info.complete.unwrap_or(false) + || info.canceled.unwrap_or(false) + || info.failed.unwrap_or(false) + { + Some(now_rfc3339()) + } else { + None + } +} + +fn decommission_progress(info: Option<&RustfsPoolDecommissionInfo>) -> PoolDecommissionProgress { + let Some(info) = info else { + return PoolDecommissionProgress::default(); + }; + + PoolDecommissionProgress { + objects_migrated: info.objects_decommissioned.map(u64_to_i64_saturating), + bytes_migrated: info.bytes_decommissioned.map(u64_to_i64_saturating), + objects_failed: info + .objects_decommissioned_failed + .map(u64_to_i64_saturating), + bytes_failed: info.bytes_decommissioned_failed.map(u64_to_i64_saturating), + } +} + +async fn cleanup_decommissioned_pool( + ctx: &Context, + tenant: &Tenant, + namespace: &str, + pool: &Pool, + mut status: PoolDecommissionStatus, +) -> PoolLifecycleDecision { + let ss_name = format!("{}-{}", tenant.name(), pool.name); + match ctx.get::(&ss_name, namespace).await { + Ok(statefulset) if statefulset.metadata.deletion_timestamp.is_none() => { + let delete_params = DeleteParams { + propagation_policy: Some(PropagationPolicy::Background), + ..DeleteParams::default() + }; + match ctx + .delete_with_params::(&ss_name, namespace, &delete_params) + .await + { + Ok(()) => { + let _ = ctx + .record( + tenant, + EventType::Normal, + "PoolDecommissionCleanupStarted", + &format!( + "Deleting StatefulSet '{}' after RustFS decommission completed; PVCs are retained", + ss_name + ), + ) + .await; + set_cleanup_status( + &mut status, + PoolDecommissionCleanupState::StatefulSetDeleting, + ); + } + Err(error) if is_not_found_context_error(&error) => { + set_cleanup_status(&mut status, PoolDecommissionCleanupState::PvcRetained); + } + Err(error) => { + warn!( + tenant = %tenant.name(), + namespace = %namespace, + statefulset = %ss_name, + %error, + "failed to delete decommissioned pool StatefulSet" + ); + status.last_error = Some(PoolDecommissionLastError { + reason: Some("StatefulSetDeleteFailed".to_string()), + message: Some( + "failed to delete decommissioned pool StatefulSet".to_string(), + ), + }); + return cleanup_retriable_decision( + status, + "StatefulSetDeleteFailed", + "failed to delete decommissioned pool StatefulSet", + ); + } + } + } + Ok(_) => { + set_cleanup_status( + &mut status, + PoolDecommissionCleanupState::StatefulSetDeleting, + ); + } + Err(error) if is_not_found_context_error(&error) => { + let was_retained = matches!( + status.cleanup.as_ref().map(|cleanup| &cleanup.state), + Some(PoolDecommissionCleanupState::PvcRetained) + ); + set_cleanup_status(&mut status, PoolDecommissionCleanupState::PvcRetained); + if !was_retained { + let _ = ctx + .record( + tenant, + EventType::Normal, + "PvcRetained", + &format!( + "StatefulSet '{}' is deleted after decommission; PVCs are retained", + ss_name + ), + ) + .await; + } + } + Err(error) => { + warn!( + tenant = %tenant.name(), + namespace = %namespace, + statefulset = %ss_name, + %error, + "failed to inspect decommissioned pool StatefulSet" + ); + status.last_error = Some(PoolDecommissionLastError { + reason: Some("StatefulSetInspectFailed".to_string()), + message: Some("failed to inspect decommissioned pool StatefulSet".to_string()), + }); + return cleanup_retriable_decision( + status, + "StatefulSetInspectFailed", + "failed to inspect decommissioned pool StatefulSet", + ); + } + } + + PoolLifecycleDecision { + state: PoolLifecycleState::Decommissioned, + decommission: Some(status), + skip_workload_reconcile: true, + } +} + +fn terminal_decision_from_existing( + existing_state: Option, + existing: Option, +) -> PoolLifecycleDecision { + let state = existing_state.unwrap_or(PoolLifecycleState::Active); + PoolLifecycleDecision { + skip_workload_reconcile: !matches!(state, PoolLifecycleState::Active), + state, + decommission: existing, + } +} + +fn active_lifecycle_decision() -> PoolLifecycleDecision { + PoolLifecycleDecision { + state: PoolLifecycleState::Active, + decommission: None, + skip_workload_reconcile: false, + } +} + +fn cancel_without_decommission_info_is_noop( + request: &DecommissionRequest, + decommission: Option<&RustfsPoolDecommissionInfo>, +) -> bool { + request.action == DecommissionAction::Cancel && decommission.is_none() +} + +fn failed_decision( + request_id: Option, + reason: &str, + message: &str, +) -> PoolLifecycleDecision { + PoolLifecycleDecision { + state: PoolLifecycleState::DecommissionFailed, + decommission: Some(PoolDecommissionStatus { + request_id, + phase: Some(PoolDecommissionPhase::Failed), + last_poll_time: Some(now_rfc3339()), + last_error: Some(PoolDecommissionLastError { + reason: Some(reason.to_string()), + message: Some(message.to_string()), + }), + ..empty_decommission_status() + }), + skip_workload_reconcile: true, + } +} + +fn retriable_decision( + request_id: Option, + reason: &str, + message: &str, +) -> PoolLifecycleDecision { + PoolLifecycleDecision { + state: PoolLifecycleState::Decommissioning, + decommission: Some(PoolDecommissionStatus { + request_id, + phase: Some(PoolDecommissionPhase::Pending), + last_poll_time: Some(now_rfc3339()), + last_error: Some(PoolDecommissionLastError { + reason: Some(reason.to_string()), + message: Some(message.to_string()), + }), + ..empty_decommission_status() + }), + skip_workload_reconcile: true, + } +} + +fn cleanup_retriable_decision( + mut status: PoolDecommissionStatus, + reason: &str, + message: &str, +) -> PoolLifecycleDecision { + if status.cleanup.is_none() { + set_cleanup_status(&mut status, PoolDecommissionCleanupState::Pending); + } + status.last_poll_time = Some(now_rfc3339()); + status.last_error = Some(PoolDecommissionLastError { + reason: Some(reason.to_string()), + message: Some(message.to_string()), + }); + + PoolLifecycleDecision { + state: PoolLifecycleState::Decommissioned, + decommission: Some(status), + skip_workload_reconcile: true, + } +} + +fn empty_decommission_status() -> PoolDecommissionStatus { + PoolDecommissionStatus { + request_id: None, + rustfs_pool_id: None, + endpoint_set_hash: None, + phase: None, + started_at: None, + last_poll_time: None, + completed_at: None, + progress: None, + cleanup: None, + last_error: None, + } +} + +fn cleanup_already_authorized_or_complete(status: &PoolDecommissionStatus) -> bool { + status.cleanup.as_ref().is_some_and(|cleanup| { + matches!( + cleanup.state, + PoolDecommissionCleanupState::StatefulSetDeleting + | PoolDecommissionCleanupState::PvcRetained + ) + }) +} + +fn decommissioned_cleanup_needs_requeue(status: &PoolDecommissionStatus) -> bool { + !status + .cleanup + .as_ref() + .is_some_and(|cleanup| matches!(cleanup.state, PoolDecommissionCleanupState::PvcRetained)) +} + +fn cleanup_status(state: PoolDecommissionCleanupState) -> PoolDecommissionCleanupStatus { + let stateful_set_deleted_at = + matches!(state, PoolDecommissionCleanupState::PvcRetained).then(now_rfc3339); + + PoolDecommissionCleanupStatus { + state, + stateful_set_deleted_at, + pvc_retention_policy: Some("Retain".to_string()), + } +} + +fn set_cleanup_status(status: &mut PoolDecommissionStatus, state: PoolDecommissionCleanupState) { + if status + .cleanup + .as_ref() + .is_some_and(|cleanup| cleanup.state == state) + { + return; + } + + status.cleanup = Some(cleanup_status(state)); +} + +fn existing_lifecycle_state(tenant: &Tenant, pool_name: &str) -> Option { + existing_pool_status(tenant, pool_name).and_then(|status| status.lifecycle_state.clone()) +} + +fn existing_decommission_status( + tenant: &Tenant, + pool_name: &str, +) -> Option { + existing_pool_status(tenant, pool_name).and_then(|status| status.decommission.clone()) +} + +fn existing_pool_status<'a>( + tenant: &'a Tenant, + pool_name: &str, +) -> Option<&'a crate::types::v1alpha1::status::pool::Pool> { + let ss_name = format!("{}-{}", tenant.name(), pool_name); + tenant + .status + .as_ref()? + .pools + .iter() + .find(|status| status.name.as_deref() == Some(pool_name) || status.ss_name == ss_name) +} + +fn endpoint_set_hash(cmd_line: &str) -> String { + let mut hasher = Sha256::new(); + hasher.update(cmd_line.as_bytes()); + format!("sha256:{}", hex::encode(hasher.finalize())) +} + +fn u64_to_i64_saturating(value: u64) -> i64 { + value.min(i64::MAX as u64) as i64 +} + +fn now_rfc3339() -> String { + chrono::Utc::now().to_rfc3339_opts(chrono::SecondsFormat::Secs, true) +} + +fn is_not_found_context_error(error: &context::Error) -> bool { + matches!( + error, + context::Error::Kube { + source: kube::Error::Api(api_error) + } if api_error.code == 404 + ) +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::types::v1alpha1::persistence::PersistenceConfig; + use crate::types::v1alpha1::pool::SchedulingConfig; + use crate::types::v1alpha1::tenant::TenantSpec; + use k8s_openapi::apimachinery::pkg::apis::meta::v1::ObjectMeta; + + fn test_pool(name: &str) -> Pool { + Pool { + name: name.to_string(), + servers: 4, + persistence: PersistenceConfig { + volumes_per_server: 2, + ..Default::default() + }, + scheduling: SchedulingConfig::default(), + } + } + + fn test_tenant(pool: Pool) -> Tenant { + Tenant { + metadata: ObjectMeta { + name: Some("logs".to_string()), + namespace: Some("rustfs-system".to_string()), + ..Default::default() + }, + spec: TenantSpec { + pools: vec![pool], + ..Default::default() + }, + status: None, + } + } + + #[test] + fn lifecycle_phase_maps_current_rustfs_terminal_flags() { + let complete = RustfsPoolDecommissionInfo { + complete: Some(true), + ..Default::default() + }; + let canceled = RustfsPoolDecommissionInfo { + canceled: Some(true), + ..Default::default() + }; + let failed = RustfsPoolDecommissionInfo { + failed: Some(true), + ..Default::default() + }; + + assert_eq!( + decommission_phase(Some(&complete)), + PoolDecommissionPhase::Complete + ); + assert_eq!( + decommission_phase(Some(&canceled)), + PoolDecommissionPhase::Canceled + ); + assert_eq!( + decommission_phase(Some(&failed)), + PoolDecommissionPhase::Failed + ); + assert_eq!(decommission_phase(None), PoolDecommissionPhase::Running); + + let canceled_and_complete = RustfsPoolDecommissionInfo { + complete: Some(true), + canceled: Some(true), + ..Default::default() + }; + assert_eq!( + decommission_phase(Some(&canceled_and_complete)), + PoolDecommissionPhase::Canceled + ); + + let failed_and_complete = RustfsPoolDecommissionInfo { + complete: Some(true), + failed: Some(true), + ..Default::default() + }; + assert_eq!( + decommission_phase(Some(&failed_and_complete)), + PoolDecommissionPhase::Failed + ); + } + + #[test] + fn decommissioned_cleanup_skips_workload_reconcile() { + let decision = terminal_decision_from_existing( + Some(PoolLifecycleState::Decommissioned), + Some(empty_decommission_status()), + ); + + assert!(decision.skip_workload_reconcile); + assert_eq!(decision.state, PoolLifecycleState::Decommissioned); + } + + #[test] + fn decommissioned_cleanup_pending_keeps_tenant_reconciling() { + let mut status = empty_decommission_status(); + status.cleanup = Some(cleanup_status( + PoolDecommissionCleanupState::StatefulSetDeleting, + )); + + let mut decisions = PoolLifecycleDecisions::default(); + decisions.insert( + "pool-a".to_string(), + PoolLifecycleDecision { + state: PoolLifecycleState::Decommissioned, + decommission: Some(status), + skip_workload_reconcile: true, + }, + ); + + assert!(decisions.any_reconciling); + assert_eq!(decisions.requeue_after, Some(POLL_INTERVAL)); + } + + #[test] + fn expected_pool_cmd_line_matches_workload_volume_format() { + let pool = test_pool("pool-a"); + let tenant = test_tenant(pool.clone()); + + assert_eq!( + expected_pool_cmd_line(&tenant, "rustfs-system", &pool).unwrap(), + "http://logs-pool-a-{0...3}.logs-hl.rustfs-system.svc.cluster.local:9000/data/rustfs{0...1}" + ); + } + + #[test] + fn decommission_started_after_request_uses_rustfs_start_time() { + let pool_item = RustfsPoolListItem { + id: 1, + cmd_line: "pool-a".to_string(), + last_update: "2026-05-20T00:00:05Z".to_string(), + total_size: None, + current_size: None, + used_size: None, + used: None, + status: "running".to_string(), + decommission: Some(RustfsPoolDecommissionInfo { + start_time: Some("2026-05-20T00:00:05Z".to_string()), + ..Default::default() + }), + }; + let request = DecommissionRequest { + pool_name: "pool-a".to_string(), + request_id: "request-1".to_string(), + action: DecommissionAction::Start, + requested_at: Some("2026-05-20T00:00:00Z".to_string()), + cancel_requested_at: None, + reason: None, + }; + + assert!(decommission_started_after_request(&pool_item, &request)); + } + + #[test] + fn running_decommission_before_request_is_not_adopted() { + let pool_item = RustfsPoolListItem { + id: 1, + cmd_line: "pool-a".to_string(), + last_update: "2026-05-20T00:00:00Z".to_string(), + total_size: None, + current_size: None, + used_size: None, + used: None, + status: "running".to_string(), + decommission: Some(RustfsPoolDecommissionInfo { + start_time: Some("2026-05-20T00:00:00Z".to_string()), + ..Default::default() + }), + }; + let request = DecommissionRequest { + pool_name: "pool-a".to_string(), + request_id: "request-1".to_string(), + action: DecommissionAction::Start, + requested_at: Some("2026-05-20T00:00:05Z".to_string()), + cancel_requested_at: None, + reason: None, + }; + + assert!(should_start_decommission(None, None, &pool_item, &request).is_err()); + } + + #[test] + fn cancel_without_decommission_info_maps_to_active_noop() { + let request = DecommissionRequest { + pool_name: "pool-a".to_string(), + request_id: "request-1".to_string(), + action: DecommissionAction::Cancel, + requested_at: None, + cancel_requested_at: Some("2026-05-20T00:00:05Z".to_string()), + reason: None, + }; + let status = RustfsPoolStatus { + id: 1, + cmd_line: "pool-a".to_string(), + last_update: "2026-05-20T00:00:10Z".to_string(), + decommission: None, + }; + + assert!(cancel_without_decommission_info_is_noop( + &request, + status.decommission.as_ref() + )); + let decision = active_lifecycle_decision(); + assert_eq!(decision.state, PoolLifecycleState::Active); + assert!(!decision.skip_workload_reconcile); + assert!(decision.decommission.is_none()); + } + + #[test] + fn start_without_decommission_info_is_not_noop() { + let request = DecommissionRequest { + pool_name: "pool-a".to_string(), + request_id: "request-1".to_string(), + action: DecommissionAction::Start, + requested_at: Some("2026-05-20T00:00:05Z".to_string()), + cancel_requested_at: None, + reason: None, + }; + let status = RustfsPoolStatus { + id: 1, + cmd_line: "pool-a".to_string(), + last_update: "2026-05-20T00:00:10Z".to_string(), + decommission: None, + }; + + assert!(!cancel_without_decommission_info_is_noop( + &request, + status.decommission.as_ref() + )); + } + + #[test] + fn cleanup_deleting_state_does_not_claim_statefulset_deleted() { + let status = cleanup_status(PoolDecommissionCleanupState::StatefulSetDeleting); + + assert_eq!( + status.state, + PoolDecommissionCleanupState::StatefulSetDeleting + ); + assert!(status.stateful_set_deleted_at.is_none()); + } +} diff --git a/src/sts/rustfs_client.rs b/src/sts/rustfs_client.rs index 2bbe6a1..bf7aa11 100644 --- a/src/sts/rustfs_client.rs +++ b/src/sts/rustfs_client.rs @@ -13,6 +13,7 @@ // limitations under the License. use std::collections::BTreeMap; +use std::time::Duration; use hmac::{Hmac, Mac}; use k8s_openapi::{ByteString, api::core::v1 as corev1}; @@ -31,8 +32,14 @@ const JSON_CONTENT_TYPE: &str = "application/json"; const ASSUME_ROLE_PATH: &str = "/"; const ADD_CANNED_POLICY_PATH: &str = "/rustfs/admin/v3/add-canned-policy"; const INFO_CANNED_POLICY_PATH: &str = "/rustfs/admin/v3/info-canned-policy"; +const POOLS_LIST_PATH: &str = "/rustfs/admin/v3/pools/list"; +const POOLS_STATUS_PATH: &str = "/rustfs/admin/v3/pools/status"; +const POOLS_DECOMMISSION_PATH: &str = "/rustfs/admin/v3/pools/decommission"; +const POOLS_CANCEL_PATH: &str = "/rustfs/admin/v3/pools/cancel"; const ADMIN_SIGNING_SERVICE: &str = "s3"; const STS_SIGNING_SERVICE: &str = "sts"; +const ADMIN_HTTP_CONNECT_TIMEOUT: Duration = Duration::from_secs(3); +const ADMIN_HTTP_REQUEST_TIMEOUT: Duration = Duration::from_secs(10); /// Credentials read from Tenant `.spec.credsSecret`. #[derive(Debug, Clone, PartialEq, Eq)] @@ -41,6 +48,59 @@ pub struct RustfsCredentials { pub secret_key: String, } +#[derive(Debug, Clone, serde::Deserialize, PartialEq)] +pub struct RustfsPoolListItem { + pub id: usize, + #[serde(rename = "cmdline")] + pub cmd_line: String, + #[serde(rename = "lastUpdate")] + pub last_update: String, + #[serde(rename = "totalSize")] + pub total_size: Option, + #[serde(rename = "currentSize")] + pub current_size: Option, + #[serde(rename = "usedSize")] + pub used_size: Option, + pub used: Option, + pub status: String, + #[serde(rename = "decommissionInfo")] + pub decommission: Option, +} + +#[derive(Debug, Clone, serde::Deserialize, PartialEq)] +pub struct RustfsPoolStatus { + pub id: usize, + #[serde(rename = "cmdline")] + pub cmd_line: String, + #[serde(rename = "lastUpdate")] + pub last_update: String, + #[serde(rename = "decommissionInfo")] + pub decommission: Option, +} + +#[derive(Debug, Clone, Default, serde::Deserialize, PartialEq)] +pub struct RustfsPoolDecommissionInfo { + #[serde(rename = "startTime")] + pub start_time: Option, + #[serde(rename = "startSize")] + pub start_size: Option, + #[serde(rename = "totalSize")] + pub total_size: Option, + #[serde(rename = "currentSize")] + pub current_size: Option, + pub complete: Option, + pub failed: Option, + pub canceled: Option, + #[serde(rename = "objectsDecommissioned")] + pub objects_decommissioned: Option, + #[serde(rename = "objectsDecommissionedFailed")] + pub objects_decommissioned_failed: Option, + #[serde(rename = "bytesDecommissioned")] + pub bytes_decommissioned: Option, + #[serde(rename = "bytesDecommissionedFailed")] + pub bytes_decommissioned_failed: Option, +} + /// Error type for RustFS admin/STS client operations. #[derive(Debug)] pub enum RustfsClientError { @@ -121,6 +181,14 @@ pub struct RustfsAdminClient { http_client: HttpClient, } +fn default_http_client() -> HttpClient { + HttpClient::builder() + .connect_timeout(ADMIN_HTTP_CONNECT_TIMEOUT) + .timeout(ADMIN_HTTP_REQUEST_TIMEOUT) + .build() + .unwrap_or_else(|_| HttpClient::new()) +} + impl RustfsAdminClient { pub const STS_VERSION: &'static str = "2011-06-15"; pub const STS_ACTION: &'static str = "AssumeRole"; @@ -130,7 +198,12 @@ impl RustfsAdminClient { access_key: impl Into, secret_key: impl Into, ) -> Self { - Self::new_with_base_url_and_http_client(base_url, access_key, secret_key, HttpClient::new()) + Self::new_with_base_url_and_http_client( + base_url, + access_key, + secret_key, + default_http_client(), + ) } pub fn new_with_base_url_and_ca_pem( @@ -141,7 +214,9 @@ impl RustfsAdminClient { ) -> Result { let certs = Certificate::from_pem_bundle(ca_pem) .map_err(|_| RustfsClientError::InvalidTenantTlsCa)?; - let mut builder = HttpClient::builder(); + let mut builder = HttpClient::builder() + .connect_timeout(ADMIN_HTTP_CONNECT_TIMEOUT) + .timeout(ADMIN_HTTP_REQUEST_TIMEOUT); for cert in certs { builder = builder.add_root_certificate(cert); } @@ -382,6 +457,48 @@ impl RustfsAdminClient { Ok(()) } + pub async fn list_pools(&self) -> Result, RustfsClientError> { + let body = self + .send_admin_request("GET", POOLS_LIST_PATH, "", "", None) + .await?; + + serde_json::from_str::>(&body) + .map_err(|_| RustfsClientError::ParseResponseFailed) + } + + pub async fn pool_status_by_id( + &self, + pool_id: &str, + ) -> Result { + let query = build_query_pairs(&[("by-id", "true"), ("pool", pool_id)]); + let body = self + .send_admin_request("GET", POOLS_STATUS_PATH, &query, "", None) + .await?; + + serde_json::from_str::(&body) + .map_err(|_| RustfsClientError::ParseResponseFailed) + } + + pub async fn start_pool_decommission_by_id( + &self, + pool_id: &str, + ) -> Result<(), RustfsClientError> { + let query = build_query_pairs(&[("by-id", "true"), ("pool", pool_id)]); + self.send_admin_request("POST", POOLS_DECOMMISSION_PATH, &query, "", None) + .await?; + Ok(()) + } + + pub async fn cancel_pool_decommission_by_id( + &self, + pool_id: &str, + ) -> Result<(), RustfsClientError> { + let query = build_query_pairs(&[("by-id", "true"), ("pool", pool_id)]); + self.send_admin_request("POST", POOLS_CANCEL_PATH, &query, "", None) + .await?; + Ok(()) + } + /// Send AssumeRole request to RustFS admin STS endpoint (`/`). pub async fn assume_role( &self, @@ -441,6 +558,68 @@ impl RustfsAdminClient { parse_assume_role_response(&body).ok_or(RustfsClientError::ParseResponseFailed) } + async fn send_admin_request( + &self, + method: &str, + path: &str, + query: &str, + body: &str, + content_type: Option<&str>, + ) -> Result { + let url = format!("{}{}", self.base_url.trim_end_matches('/'), path); + let url = if query.is_empty() { + url + } else { + format!("{url}?{query}") + }; + + let signed = self.sign_request( + method, + path, + query, + body, + content_type, + ADMIN_SIGNING_SERVICE, + )?; + let host = self.host()?; + + let builder = match method { + "GET" => self.http_client.get(url), + "POST" => self.http_client.post(url), + "PUT" => self.http_client.put(url), + _ => return Err(RustfsClientError::RequestBuildFailed), + } + .header("x-amz-date", &signed.amz_date) + .header("x-amz-content-sha256", &signed.payload_hash) + .header("authorization", &signed.authorization) + .header("host", host); + + let builder = if let Some(content_type) = content_type { + builder.header("content-type", content_type) + } else { + builder + }; + let builder = if body.is_empty() { + builder + } else { + builder.body(body.to_string()) + }; + + let response = builder + .send() + .await + .map_err(|_| RustfsClientError::RequestFailed)?; + + if !response.status().is_success() { + return Err(RustfsClientError::UnexpectedStatus(response.status())); + } + + response + .text() + .await + .map_err(|_| RustfsClientError::RequestFailed) + } + fn sign_request( &self, method: &str, @@ -941,6 +1120,135 @@ mod tests { server.abort(); } + #[tokio::test] + async fn list_pools_parses_current_rustfs_pool_shape() { + let router = Router::new().route( + POOLS_LIST_PATH, + get(|| async { + ( + StatusCode::OK, + r#"[{"id":1,"cmdline":"http://tenant-pool-a-{0...3}.tenant-hl.ns.svc.cluster.local:9000/data/rustfs{0...3}","lastUpdate":"2026-05-20T00:00:00Z","totalSize":100,"currentSize":50,"usedSize":25,"used":25.0,"status":"running","decommissionInfo":{"startTime":"2026-05-20T00:00:00Z","complete":false,"failed":false,"canceled":false,"objectsDecommissioned":7,"objectsDecommissionedFailed":1,"bytesDecommissioned":9,"bytesDecommissionedFailed":2}}]"#, + ) + }), + ); + + let listener = tokio::net::TcpListener::bind(("127.0.0.1", 0)) + .await + .unwrap(); + let addr = listener.local_addr().unwrap(); + let server = tokio::spawn(async move { axum::serve(listener, router).await.unwrap() }); + + let client = + RustfsAdminClient::new_with_base_url(format!("http://{addr}"), "access", "secret"); + + let pools = client.list_pools().await.unwrap(); + + assert_eq!(pools[0].id, 1); + assert_eq!(pools[0].status, "running"); + assert_eq!( + pools[0] + .decommission + .as_ref() + .and_then(|info| info.objects_decommissioned), + Some(7) + ); + + server.abort(); + } + + #[tokio::test] + async fn pool_decommission_start_uses_by_id_query_and_admin_signing() { + let capture = Capture::default(); + let route_capture = capture.clone(); + + let router = Router::new() + .route( + POOLS_DECOMMISSION_PATH, + post( + move |State(c): State, req: Request| async move { + *c.path.lock().await = req.uri().path().to_string(); + *c.query.lock().await = req.uri().query().unwrap_or("").to_string(); + *c.authorization.lock().await = req + .headers() + .get("authorization") + .and_then(|value| value.to_str().ok()) + .unwrap_or("") + .to_string(); + + StatusCode::OK + }, + ), + ) + .with_state(route_capture.clone()); + + let listener = tokio::net::TcpListener::bind(("127.0.0.1", 0)) + .await + .unwrap(); + let addr = listener.local_addr().unwrap(); + let server = tokio::spawn(async move { axum::serve(listener, router).await.unwrap() }); + + let client = + RustfsAdminClient::new_with_base_url(format!("http://{addr}"), "access", "secret"); + + client.start_pool_decommission_by_id("1").await.unwrap(); + + assert_eq!(&*capture.path.lock().await, POOLS_DECOMMISSION_PATH); + assert_eq!(&*capture.query.lock().await, "by-id=true&pool=1"); + assert!( + capture + .authorization + .lock() + .await + .contains("/s3/aws4_request") + ); + + server.abort(); + } + + #[tokio::test] + async fn pool_status_uses_by_id_query_and_parses_decommission_info() { + let capture = Capture::default(); + let route_capture = capture.clone(); + + let router = Router::new() + .route( + POOLS_STATUS_PATH, + get( + move |State(c): State, req: Request| async move { + *c.path.lock().await = req.uri().path().to_string(); + *c.query.lock().await = req.uri().query().unwrap_or("").to_string(); + + ( + StatusCode::OK, + r#"{"id":1,"cmdline":"http://tenant-pool-a-{0...3}.tenant-hl.ns.svc.cluster.local:9000/data/rustfs{0...3}","lastUpdate":"2026-05-20T00:00:00Z","decommissionInfo":{"startTime":"2026-05-20T00:00:00Z","complete":true,"failed":false,"canceled":false,"objectsDecommissioned":10,"objectsDecommissionedFailed":0,"bytesDecommissioned":20,"bytesDecommissionedFailed":0}}"#, + ) + }, + ), + ) + .with_state(route_capture.clone()); + + let listener = tokio::net::TcpListener::bind(("127.0.0.1", 0)) + .await + .unwrap(); + let addr = listener.local_addr().unwrap(); + let server = tokio::spawn(async move { axum::serve(listener, router).await.unwrap() }); + + let client = + RustfsAdminClient::new_with_base_url(format!("http://{addr}"), "access", "secret"); + + let status = client.pool_status_by_id("1").await.unwrap(); + + assert_eq!(status.id, 1); + assert_eq!(&*capture.path.lock().await, POOLS_STATUS_PATH); + assert_eq!(&*capture.query.lock().await, "by-id=true&pool=1"); + assert_eq!( + status.decommission.and_then(|info| info.complete), + Some(true) + ); + + server.abort(); + } + #[test] fn extract_canned_policy_document_accepts_raw_policy_document() { let raw_policy = diff --git a/src/types/v1alpha1.rs b/src/types/v1alpha1.rs index 45b3f5f..68e2d95 100755 --- a/src/types/v1alpha1.rs +++ b/src/types/v1alpha1.rs @@ -18,6 +18,7 @@ pub mod logging; pub mod persistence; pub mod policy_binding; pub mod pool; +pub mod pool_lifecycle; pub mod status; pub mod tenant; pub mod tls; diff --git a/src/types/v1alpha1/pool_lifecycle.rs b/src/types/v1alpha1/pool_lifecycle.rs new file mode 100644 index 0000000..69158f2 --- /dev/null +++ b/src/types/v1alpha1/pool_lifecycle.rs @@ -0,0 +1,79 @@ +// Copyright 2025 RustFS Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use kube::KubeSchema; +use schemars::JsonSchema; +use serde::{Deserialize, Serialize}; +use strum::Display; + +#[derive(Deserialize, Serialize, Clone, Debug, KubeSchema, Default)] +#[serde(rename_all = "camelCase")] +#[x_kube(validation = Rule::new("!has(self.decommissionRequests) || self.decommissionRequests.all(r, self.decommissionRequests.exists_one(other, other.poolName == r.poolName))"). + message("decommissionRequests must contain at most one entry per poolName"))] +pub struct PoolLifecycleSpec { + #[serde(default, skip_serializing_if = "is_default_pvc_retention_policy")] + pub pvc_retention_policy: PvcRetentionPolicy, + + #[serde(default, skip_serializing_if = "Vec::is_empty")] + pub decommission_requests: Vec, +} + +#[derive(Default, Deserialize, Serialize, Clone, Debug, JsonSchema, Display, PartialEq, Eq)] +#[serde(rename_all = "PascalCase")] +#[schemars(rename_all = "PascalCase")] +pub enum PvcRetentionPolicy { + #[strum(to_string = "Retain")] + #[default] + Retain, +} + +#[derive(Deserialize, Serialize, Clone, Debug, KubeSchema, PartialEq, Eq)] +#[serde(rename_all = "camelCase")] +pub struct DecommissionRequest { + pub pool_name: String, + pub request_id: String, + pub action: DecommissionAction, + + #[serde(default, skip_serializing_if = "Option::is_none")] + pub requested_at: Option, + + #[serde(default, skip_serializing_if = "Option::is_none")] + pub cancel_requested_at: Option, + + #[serde(default, skip_serializing_if = "Option::is_none")] + pub reason: Option, +} + +#[derive(Deserialize, Serialize, Clone, Debug, JsonSchema, Display, PartialEq, Eq)] +#[serde(rename_all = "PascalCase")] +#[schemars(rename_all = "PascalCase")] +pub enum DecommissionAction { + #[strum(to_string = "Start")] + Start, + + #[strum(to_string = "Cancel")] + Cancel, +} + +fn is_default_pvc_retention_policy(policy: &PvcRetentionPolicy) -> bool { + policy == &PvcRetentionPolicy::Retain +} + +impl PoolLifecycleSpec { + pub fn request_for_pool(&self, pool_name: &str) -> Option<&DecommissionRequest> { + self.decommission_requests + .iter() + .find(|request| request.pool_name == pool_name) + } +} diff --git a/src/types/v1alpha1/status.rs b/src/types/v1alpha1/status.rs index 7b5a77d..c3ff8bb 100755 --- a/src/types/v1alpha1/status.rs +++ b/src/types/v1alpha1/status.rs @@ -135,6 +135,10 @@ pub enum Reason { TlsHotReloadUnsupported, CertificateExpiring, PoolDeleteBlocked, + PoolDecommissioning, + PoolDecommissioned, + PoolDecommissionCanceled, + PoolDecommissionFailed, StatefulSetApplyFailed, StatefulSetUpdateValidationFailed, RolloutInProgress, @@ -178,6 +182,10 @@ impl Reason { Self::TlsHotReloadUnsupported => "TlsHotReloadUnsupported", Self::CertificateExpiring => "CertificateExpiring", Self::PoolDeleteBlocked => "PoolDeleteBlocked", + Self::PoolDecommissioning => "PoolDecommissioning", + Self::PoolDecommissioned => "PoolDecommissioned", + Self::PoolDecommissionCanceled => "PoolDecommissionCanceled", + Self::PoolDecommissionFailed => "PoolDecommissionFailed", Self::StatefulSetApplyFailed => "StatefulSetApplyFailed", Self::StatefulSetUpdateValidationFailed => "StatefulSetUpdateValidationFailed", Self::RolloutInProgress => "RolloutInProgress", @@ -422,6 +430,8 @@ pub fn is_blocked_reason(reason: &str) -> bool { | "CaBundleInvalid" | "TlsHotReloadUnsupported" | "PoolDeleteBlocked" + | "PoolDecommissionCanceled" + | "PoolDecommissionFailed" | "StatefulSetUpdateValidationFailed" ) } @@ -474,6 +484,10 @@ pub fn next_actions_for_reason(reason: &str) -> Vec<&'static str> { "InvalidTenantName" => vec!["renameTenant"], "ImmutableFieldModified" => vec!["restoreImmutableField"], "PoolDeleteBlocked" => vec!["restorePoolSpec", "startDecommissionAfterRestore"], + "PoolDecommissioning" => vec!["waitForDecommission", "inspectPoolStatus"], + "PoolDecommissioned" => vec!["removePoolSpec", "inspectRetainedPvcs"], + "PoolDecommissionCanceled" => vec!["startDecommission", "inspectPoolStatus"], + "PoolDecommissionFailed" => vec!["inspectPoolStatus", "inspectOperatorLogs"], "DecommissionRequired" => vec!["startDecommission", "inspectPoolStatus"], "StatefulSetUpdateValidationFailed" => vec!["restoreImmutableField"], "StatefulSetApplyFailed" => vec!["retry", "inspectOperatorLogs"], diff --git a/src/types/v1alpha1/status/pool.rs b/src/types/v1alpha1/status/pool.rs index 21c8c1f..7e4ab98 100755 --- a/src/types/v1alpha1/status/pool.rs +++ b/src/types/v1alpha1/status/pool.rs @@ -21,12 +21,28 @@ use strum::Display; #[derive(Deserialize, Serialize, Clone, Debug, KubeSchema, PartialEq)] #[serde(rename_all = "camelCase")] pub struct Pool { + /// Pool name from Tenant spec. Optional for backward compatibility with older status. + #[serde(default, skip_serializing_if = "Option::is_none")] + pub name: Option, + /// Name of the StatefulSet for this pool pub ss_name: String, /// Current state of the pool pub state: PoolState, + /// Lifecycle state of the pool, separate from StatefulSet rollout state. + #[serde(default, skip_serializing_if = "Option::is_none")] + pub lifecycle_state: Option, + + /// Workload rollout state of this pool. Mirrors `state` for compatibility. + #[serde(default, skip_serializing_if = "Option::is_none")] + pub workload_state: Option, + + /// Decommission progress and cleanup status for this pool. + #[serde(default, skip_serializing_if = "Option::is_none")] + pub decommission: Option, + /// Total number of non-terminated pods targeted by this pool's StatefulSet #[serde(skip_serializing_if = "Option::is_none")] pub replicas: Option, @@ -80,6 +96,132 @@ pub enum PoolState { Degraded, } +#[derive(Deserialize, Serialize, Clone, Debug, Display, PartialEq, Eq, JsonSchema)] +#[serde(rename_all = "PascalCase")] +#[schemars(rename_all = "PascalCase")] +pub enum PoolLifecycleState { + #[strum(to_string = "Active")] + Active, + + #[strum(to_string = "Decommissioning")] + Decommissioning, + + #[strum(to_string = "Decommissioned")] + Decommissioned, + + #[strum(to_string = "DecommissionCanceled")] + DecommissionCanceled, + + #[strum(to_string = "DecommissionFailed")] + DecommissionFailed, +} + +#[derive(Deserialize, Serialize, Clone, Debug, PartialEq, Eq, KubeSchema)] +#[serde(rename_all = "camelCase")] +pub struct PoolDecommissionStatus { + #[serde(default, skip_serializing_if = "Option::is_none")] + pub request_id: Option, + + #[serde(default, skip_serializing_if = "Option::is_none")] + pub rustfs_pool_id: Option, + + #[serde(default, skip_serializing_if = "Option::is_none")] + pub endpoint_set_hash: Option, + + #[serde(default, skip_serializing_if = "Option::is_none")] + pub phase: Option, + + #[serde(default, skip_serializing_if = "Option::is_none")] + pub started_at: Option, + + #[serde(default, skip_serializing_if = "Option::is_none")] + pub last_poll_time: Option, + + #[serde(default, skip_serializing_if = "Option::is_none")] + pub completed_at: Option, + + #[serde(default, skip_serializing_if = "Option::is_none")] + pub progress: Option, + + #[serde(default, skip_serializing_if = "Option::is_none")] + pub cleanup: Option, + + #[serde(default, skip_serializing_if = "Option::is_none")] + pub last_error: Option, +} + +#[derive(Deserialize, Serialize, Clone, Debug, Display, PartialEq, Eq, JsonSchema)] +#[serde(rename_all = "PascalCase")] +#[schemars(rename_all = "PascalCase")] +pub enum PoolDecommissionPhase { + #[strum(to_string = "Pending")] + Pending, + + #[strum(to_string = "Running")] + Running, + + #[strum(to_string = "Complete")] + Complete, + + #[strum(to_string = "Canceled")] + Canceled, + + #[strum(to_string = "Failed")] + Failed, +} + +#[derive(Deserialize, Serialize, Clone, Debug, Default, PartialEq, Eq, KubeSchema)] +#[serde(rename_all = "camelCase")] +pub struct PoolDecommissionProgress { + #[serde(default, skip_serializing_if = "Option::is_none")] + pub objects_migrated: Option, + + #[serde(default, skip_serializing_if = "Option::is_none")] + pub bytes_migrated: Option, + + #[serde(default, skip_serializing_if = "Option::is_none")] + pub objects_failed: Option, + + #[serde(default, skip_serializing_if = "Option::is_none")] + pub bytes_failed: Option, +} + +#[derive(Deserialize, Serialize, Clone, Debug, PartialEq, Eq, KubeSchema)] +#[serde(rename_all = "camelCase")] +pub struct PoolDecommissionCleanupStatus { + pub state: PoolDecommissionCleanupState, + + #[serde(default, skip_serializing_if = "Option::is_none")] + pub stateful_set_deleted_at: Option, + + #[serde(default, skip_serializing_if = "Option::is_none")] + pub pvc_retention_policy: Option, +} + +#[derive(Deserialize, Serialize, Clone, Debug, Display, PartialEq, Eq, JsonSchema)] +#[serde(rename_all = "PascalCase")] +#[schemars(rename_all = "PascalCase")] +pub enum PoolDecommissionCleanupState { + #[strum(to_string = "Pending")] + Pending, + + #[strum(to_string = "StatefulSetDeleting")] + StatefulSetDeleting, + + #[strum(to_string = "PvcRetained")] + PvcRetained, +} + +#[derive(Deserialize, Serialize, Clone, Debug, PartialEq, Eq, KubeSchema)] +#[serde(rename_all = "camelCase")] +pub struct PoolDecommissionLastError { + #[serde(default, skip_serializing_if = "Option::is_none")] + pub reason: Option, + + #[serde(default, skip_serializing_if = "Option::is_none")] + pub message: Option, +} + impl JsonSchema for PoolState { fn schema_name() -> Cow<'static, str> { Cow::Borrowed("State") diff --git a/src/types/v1alpha1/tenant.rs b/src/types/v1alpha1/tenant.rs index 0968729..4af84e6 100755 --- a/src/types/v1alpha1/tenant.rs +++ b/src/types/v1alpha1/tenant.rs @@ -16,6 +16,7 @@ use crate::types::v1alpha1::encryption::{EncryptionConfig, PodSecurityContextOve use crate::types::v1alpha1::k8s; use crate::types::v1alpha1::logging::LoggingConfig; use crate::types::v1alpha1::pool::Pool; +use crate::types::v1alpha1::pool_lifecycle::PoolLifecycleSpec; use crate::types::v1alpha1::tls::TlsConfig; use crate::types::{self, error::NoNamespaceSnafu}; use k8s_openapi::api::core::v1 as corev1; @@ -52,6 +53,10 @@ pub struct TenantSpec { #[x_kube(validation = Rule::new("self.size() > 0").message("pools must be configured"))] pub pools: Vec, + /// Explicit lifecycle requests for pool decommissioning. + #[serde(default, skip_serializing_if = "Option::is_none")] + pub pool_lifecycle: Option, + #[serde( default = "helper::get_rustfs_image", skip_serializing_if = "Option::is_none" @@ -316,8 +321,12 @@ impl Tenant { Some(chrono::Utc::now().to_rfc3339_opts(chrono::SecondsFormat::Secs, true)); crate::types::v1alpha1::status::pool::Pool { + name: Some(pool_name.to_string()), ss_name, - state, + state: state.clone(), + lifecycle_state: Some(crate::types::v1alpha1::status::pool::PoolLifecycleState::Active), + workload_state: Some(state), + decommission: None, replicas, ready_replicas, current_replicas, diff --git a/src/types/v1alpha1/tenant/workloads.rs b/src/types/v1alpha1/tenant/workloads.rs index d1d0d05..cf38f3a 100755 --- a/src/types/v1alpha1/tenant/workloads.rs +++ b/src/types/v1alpha1/tenant/workloads.rs @@ -47,33 +47,37 @@ fn stateful_name(tenant: &Tenant, pool: &Pool) -> String { } impl Tenant { + pub(crate) fn rustfs_pool_volume_spec( + &self, + pool: &Pool, + scheme: &str, + namespace: &str, + ) -> String { + let tenant_name = self.name(); + let headless_service = self.headless_service_name(); + let base_path = pool.persistence.path.as_deref().unwrap_or("/data"); + + format!( + "{scheme}://{tenant_name}-{}-{{0...{}}}.{headless_service}.{namespace}.svc.cluster.local:9000{}/rustfs{{0...{}}}", + pool.name, + pool.servers - 1, + base_path.trim_end_matches('/'), + pool.persistence.volumes_per_server - 1 + ) + } + /// Constructs the RUSTFS_VOLUMES environment variable value /// Format: http://{tenant}-{pool}-{0...servers-1}.{service}.{namespace}.svc.cluster.local:9000{path}/rustfs{0...volumes-1} /// All pools are combined into a space-separated string for a unified cluster /// Follows RustFS convention: /data/rustfs0, /data/rustfs1, etc. fn rustfs_volumes_env_value(&self, scheme: &str) -> Result { let namespace = self.namespace()?; - let tenant_name = self.name(); - let headless_service = self.headless_service_name(); - - let volume_specs: Vec = self + let volume_specs = self .spec .pools .iter() - .map(|pool| { - let base_path = pool.persistence.path.as_deref().unwrap_or("/data"); - let pool_name = &pool.name; - - // Construct volume specification with range notation - // Follows RustFS convention: /data/rustfs{0...N} - format!( - "{scheme}://{tenant_name}-{pool_name}-{{0...{}}}.{headless_service}.{namespace}.svc.cluster.local:9000{}/rustfs{{0...{}}}", - pool.servers - 1, - base_path.trim_end_matches('/'), - pool.persistence.volumes_per_server - 1 - ) - }) - .collect(); + .map(|pool| self.rustfs_pool_volume_spec(pool, scheme, &namespace)) + .collect::>(); Ok(volume_specs.join(" ")) }