From c6f6b7f0b621d93de8d6eb75fe925a6bb2503fe6 Mon Sep 17 00:00:00 2001 From: Taylor Mutch Date: Mon, 15 Jun 2026 10:08:53 -0700 Subject: [PATCH] fix(server): retry sandbox delete phase conflicts Signed-off-by: Taylor Mutch --- crates/openshell-server/src/compute/mod.rs | 174 +++++++++++++++++++-- 1 file changed, 163 insertions(+), 11 deletions(-) diff --git a/crates/openshell-server/src/compute/mod.rs b/crates/openshell-server/src/compute/mod.rs index 6d687fb7c..272c3907f 100644 --- a/crates/openshell-server/src/compute/mod.rs +++ b/crates/openshell-server/src/compute/mod.rs @@ -51,6 +51,8 @@ type DriverWatchStream = Pin + Send + Sync>; +const DELETE_PHASE_CAS_RETRY_LIMIT: usize = 3; + #[tonic::async_trait] trait ShutdownCleanup: Send + Sync { async fn cleanup_on_shutdown(&self) -> Result<(), String>; @@ -540,17 +542,7 @@ impl ComputeRuntime { let id = sandbox.object_id().to_string(); - // Use CAS to set phase to Deleting - // TODO: Accept expected_version from DeleteSandboxRequest for proper client-driven CAS - let sandbox = self - .store - .update_message_cas::(&id, 0, |s| { - s.set_phase(SandboxPhase::Deleting as i32); - }) - .await - .map_err(|e| { - crate::grpc::persistence_error_to_status(e, "set sandbox phase to Deleting") - })?; + let sandbox = self.set_sandbox_phase_deleting_with_retry(&id).await?; self.sandbox_index.update_from_sandbox(&sandbox); self.sandbox_watch_bus.notify(&id); @@ -574,6 +566,114 @@ impl ComputeRuntime { Ok(deleted) } + async fn set_sandbox_phase_deleting_with_retry( + &self, + sandbox_id: &str, + ) -> Result { + self.set_sandbox_phase_deleting_with_initial_snapshot(sandbox_id, None) + .await + } + + async fn set_sandbox_phase_deleting_with_initial_snapshot( + &self, + sandbox_id: &str, + mut initial_snapshot: Option, + ) -> Result { + let operation = "set sandbox phase to Deleting"; + + for attempt in 1..=DELETE_PHASE_CAS_RETRY_LIMIT { + let sandbox = match initial_snapshot.take() { + Some(sandbox) => sandbox, + None => self + .store + .get_message::(sandbox_id) + .await + .map_err(|e| Status::internal(format!("fetch sandbox failed: {e}")))? + .ok_or_else(|| Status::not_found("sandbox not found"))?, + }; + + match self + .write_sandbox_phase_deleting_from_snapshot(sandbox) + .await + { + Ok(sandbox) => { + if attempt > 1 { + debug!( + sandbox_id, + attempt, "Retried sandbox delete phase transition after CAS conflict" + ); + } + return Ok(sandbox); + } + Err(crate::persistence::PersistenceError::Conflict { + current_resource_version, + }) => { + let err = crate::persistence::PersistenceError::Conflict { + current_resource_version, + }; + if attempt == DELETE_PHASE_CAS_RETRY_LIMIT { + return Err(crate::grpc::persistence_error_to_status(err, operation)); + } + debug!( + sandbox_id, + attempt, + current_resource_version, + "Sandbox delete phase transition conflicted; retrying" + ); + tokio::task::yield_now().await; + } + Err(err) => return Err(crate::grpc::persistence_error_to_status(err, operation)), + } + } + + unreachable!("delete phase retry loop always returns") + } + + async fn write_sandbox_phase_deleting_from_snapshot( + &self, + mut sandbox: Sandbox, + ) -> crate::persistence::PersistenceResult { + let id = sandbox.object_id().to_string(); + let name = sandbox.object_name().to_string(); + let expected_resource_version = sandbox + .metadata + .as_ref() + .map_or(0, |metadata| metadata.resource_version); + + sandbox.set_phase(SandboxPhase::Deleting as i32); + + let labels_json = sandbox + .metadata + .as_ref() + .map(|metadata| &metadata.labels) + .filter(|labels| !labels.is_empty()) + .map(serde_json::to_string) + .transpose() + .map_err(|e| { + crate::persistence::PersistenceError::Encode(format!( + "failed to serialize labels: {e}" + )) + })?; + + let result = self + .store + .put_if( + Sandbox::object_type(), + &id, + &name, + &sandbox.encode_to_vec(), + labels_json.as_deref(), + WriteCondition::MatchResourceVersion(expected_resource_version), + ) + .await?; + + if let Some(metadata) = sandbox.metadata.as_mut() { + metadata.resource_version = result.resource_version; + } + + Ok(sandbox) + } + pub fn spawn_watchers(&self, shutdown_rx: watch::Receiver) { let runtime = Arc::new(self.clone()); if self.store.is_single_replica() { @@ -2536,6 +2636,58 @@ mod tests { )); } + #[tokio::test] + async fn set_sandbox_phase_deleting_retries_after_stale_snapshot_conflict() { + let runtime = test_runtime(Arc::new(TestDriver::default())).await; + let sandbox = sandbox_record("sb-1", "sandbox-a", SandboxPhase::Ready); + runtime.store.put_message(&sandbox).await.unwrap(); + + let stale_snapshot = runtime + .store + .get_message::("sb-1") + .await + .unwrap() + .unwrap(); + + runtime + .store + .update_message_cas::("sb-1", 0, |sandbox| { + sandbox.set_current_policy_version(7); + }) + .await + .unwrap(); + + let updated = runtime + .set_sandbox_phase_deleting_with_initial_snapshot("sb-1", Some(stale_snapshot)) + .await + .unwrap(); + + assert_eq!( + SandboxPhase::try_from(updated.phase()).unwrap(), + SandboxPhase::Deleting + ); + assert_eq!(updated.current_policy_version(), 7); + assert_eq!( + updated + .metadata + .as_ref() + .map_or(0, |metadata| metadata.resource_version), + 3 + ); + + let stored = runtime + .store + .get_message::("sb-1") + .await + .unwrap() + .unwrap(); + assert_eq!( + SandboxPhase::try_from(stored.phase()).unwrap(), + SandboxPhase::Deleting + ); + assert_eq!(stored.current_policy_version(), 7); + } + #[tokio::test] async fn apply_sandbox_update_allows_delete_failures_to_recover() { let runtime = test_runtime(Arc::new(TestDriver::default())).await;