Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
174 changes: 163 additions & 11 deletions crates/openshell-server/src/compute/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ type DriverWatchStream = Pin<Box<dyn Stream<Item = Result<WatchSandboxesEvent, S
type SharedComputeDriver =
Arc<dyn ComputeDriver<WatchSandboxesStream = DriverWatchStream> + Send + Sync>;

const DELETE_PHASE_CAS_RETRY_LIMIT: usize = 3;

#[tonic::async_trait]
trait ShutdownCleanup: Send + Sync {
async fn cleanup_on_shutdown(&self) -> Result<(), String>;
Expand Down Expand Up @@ -540,17 +542,7 @@ impl ComputeRuntime {

let id = sandbox.object_id().to_string();

// Use CAS to set phase to Deleting
// TODO: Accept expected_version from DeleteSandboxRequest for proper client-driven CAS
let sandbox = self
.store
.update_message_cas::<Sandbox, _>(&id, 0, |s| {
s.set_phase(SandboxPhase::Deleting as i32);
})
.await
.map_err(|e| {
crate::grpc::persistence_error_to_status(e, "set sandbox phase to Deleting")
})?;
let sandbox = self.set_sandbox_phase_deleting_with_retry(&id).await?;

self.sandbox_index.update_from_sandbox(&sandbox);
self.sandbox_watch_bus.notify(&id);
Expand All @@ -574,6 +566,114 @@ impl ComputeRuntime {
Ok(deleted)
}

async fn set_sandbox_phase_deleting_with_retry(
&self,
sandbox_id: &str,
) -> Result<Sandbox, Status> {
self.set_sandbox_phase_deleting_with_initial_snapshot(sandbox_id, None)
.await
}

async fn set_sandbox_phase_deleting_with_initial_snapshot(
&self,
sandbox_id: &str,
mut initial_snapshot: Option<Sandbox>,
) -> Result<Sandbox, Status> {
let operation = "set sandbox phase to Deleting";

for attempt in 1..=DELETE_PHASE_CAS_RETRY_LIMIT {
let sandbox = match initial_snapshot.take() {
Some(sandbox) => sandbox,
None => self
.store
.get_message::<Sandbox>(sandbox_id)
.await
.map_err(|e| Status::internal(format!("fetch sandbox failed: {e}")))?
.ok_or_else(|| Status::not_found("sandbox not found"))?,
};

match self
.write_sandbox_phase_deleting_from_snapshot(sandbox)
.await
{
Ok(sandbox) => {
if attempt > 1 {
debug!(
sandbox_id,
attempt, "Retried sandbox delete phase transition after CAS conflict"
);
}
return Ok(sandbox);
}
Err(crate::persistence::PersistenceError::Conflict {
current_resource_version,
}) => {
let err = crate::persistence::PersistenceError::Conflict {
current_resource_version,
};
if attempt == DELETE_PHASE_CAS_RETRY_LIMIT {
return Err(crate::grpc::persistence_error_to_status(err, operation));
}
debug!(
sandbox_id,
attempt,
current_resource_version,
"Sandbox delete phase transition conflicted; retrying"
);
tokio::task::yield_now().await;
}
Err(err) => return Err(crate::grpc::persistence_error_to_status(err, operation)),
}
}

unreachable!("delete phase retry loop always returns")
}

async fn write_sandbox_phase_deleting_from_snapshot(
&self,
mut sandbox: Sandbox,
) -> crate::persistence::PersistenceResult<Sandbox> {
let id = sandbox.object_id().to_string();
let name = sandbox.object_name().to_string();
let expected_resource_version = sandbox
.metadata
.as_ref()
.map_or(0, |metadata| metadata.resource_version);

sandbox.set_phase(SandboxPhase::Deleting as i32);

let labels_json = sandbox
.metadata
.as_ref()
.map(|metadata| &metadata.labels)
.filter(|labels| !labels.is_empty())
.map(serde_json::to_string)
.transpose()
.map_err(|e| {
crate::persistence::PersistenceError::Encode(format!(
"failed to serialize labels: {e}"
))
})?;

let result = self
.store
.put_if(
Sandbox::object_type(),
&id,
&name,
&sandbox.encode_to_vec(),
labels_json.as_deref(),
WriteCondition::MatchResourceVersion(expected_resource_version),
)
.await?;

if let Some(metadata) = sandbox.metadata.as_mut() {
metadata.resource_version = result.resource_version;
}

Ok(sandbox)
}

pub fn spawn_watchers(&self, shutdown_rx: watch::Receiver<bool>) {
let runtime = Arc::new(self.clone());
if self.store.is_single_replica() {
Expand Down Expand Up @@ -2536,6 +2636,58 @@ mod tests {
));
}

#[tokio::test]
async fn set_sandbox_phase_deleting_retries_after_stale_snapshot_conflict() {
let runtime = test_runtime(Arc::new(TestDriver::default())).await;
let sandbox = sandbox_record("sb-1", "sandbox-a", SandboxPhase::Ready);
runtime.store.put_message(&sandbox).await.unwrap();

let stale_snapshot = runtime
.store
.get_message::<Sandbox>("sb-1")
.await
.unwrap()
.unwrap();

runtime
.store
.update_message_cas::<Sandbox, _>("sb-1", 0, |sandbox| {
sandbox.set_current_policy_version(7);
})
.await
.unwrap();

let updated = runtime
.set_sandbox_phase_deleting_with_initial_snapshot("sb-1", Some(stale_snapshot))
.await
.unwrap();

assert_eq!(
SandboxPhase::try_from(updated.phase()).unwrap(),
SandboxPhase::Deleting
);
assert_eq!(updated.current_policy_version(), 7);
assert_eq!(
updated
.metadata
.as_ref()
.map_or(0, |metadata| metadata.resource_version),
3
);

let stored = runtime
.store
.get_message::<Sandbox>("sb-1")
.await
.unwrap()
.unwrap();
assert_eq!(
SandboxPhase::try_from(stored.phase()).unwrap(),
SandboxPhase::Deleting
);
assert_eq!(stored.current_policy_version(), 7);
}

#[tokio::test]
async fn apply_sandbox_update_allows_delete_failures_to_recover() {
let runtime = test_runtime(Arc::new(TestDriver::default())).await;
Expand Down
Loading