From 28319715bca78f96442da04750b605cf51848c66 Mon Sep 17 00:00:00 2001 From: Aaron J Todd Date: Sat, 27 Jun 2026 10:23:34 -0400 Subject: [PATCH] feat(client): add idle-management methods to the composable pool Cache Add three methods to `client::pool` for managing idle cached services explicitly: - `Cached::discard(self)` consumes the checkout and drops the inner service without returning it to the pool. - `Cache::try_pop_idle()` removes and returns one idle service raw, with no return-to-pool wrapper. - `Cache::try_checkout_idle()` takes one idle service wrapped so it returns to the pool on drop, and makes no new service when none is idle. `Cache` previously exposed only `call` (take a service, making one when the pool is empty) and `retain` (bulk eviction by predicate). These cover two operations a pool consumer otherwise cannot express: dropping a checked-out service that has become unusable without forcing its `poll_ready` to error, and acting on existing idle capacity without making a new service. The methods are thin wrappers over the existing shared service list, serialized by the same lock as `call` and `retain`. `discard` sets the `is_closed` flag the `Drop` impl already checks. No existing behavior changes. --- src/client/pool/cache.rs | 130 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 130 insertions(+) diff --git a/src/client/pool/cache.rs b/src/client/pool/cache.rs index 699fe6f0..520f0bd5 100644 --- a/src/client/pool/cache.rs +++ b/src/client/pool/cache.rs @@ -173,6 +173,34 @@ mod internal { pub fn is_empty(&self) -> bool { self.shared.lock().unwrap().services.is_empty() } + + /// Remove and return one idle cached service, if any. + /// + /// Unlike [`Service::call`], which wraps the taken service in a + /// [`Cached`] that returns to the pool on drop, this hands back the + /// raw service with no return-to-pool wrapper: dropping it drops the + /// service outright. Use it to release the resource an idle service + /// holds proactively, rather than waiting for it to be handed out + /// again or evicted by a [`Self::retain`] pass. Serialized against + /// `retain` by the shared lock, so a popped service is removed before + /// a retain pass can observe it. + pub fn try_pop_idle(&self) -> Option { + self.shared.lock().unwrap().services.pop() + } + + /// Take one idle cached service, if any, wrapped so it returns to the + /// pool on drop. + /// + /// Unlike [`Self::try_pop_idle`], which hands back the raw service, + /// this returns the same [`Cached`] wrapper [`Service::call`] + /// produces: dropping it re-inserts the service into the pool. Unlike + /// [`Service::call`], it never makes a new service — it returns `None` + /// when no service is idle. Serialized against [`Self::retain`] and + /// [`Self::try_pop_idle`] by the shared lock. + pub fn try_checkout_idle(&self) -> Option> { + let inner = self.shared.lock().unwrap().services.pop()?; + Some(Cached::new(inner, Arc::downgrade(&self.shared))) + } } impl Service for Cache @@ -313,6 +341,17 @@ mod internal { pub fn inner_mut(&mut self) -> &mut S { self.inner.as_mut().expect("inner only taken in drop") } + + /// Prevent this cached service from being returned to the pool. + /// + /// Consumes `self`; the inner service is dropped without reinsertion, + /// regardless of whether it is still healthy. Use it when the caller + /// has learned the service should not be reused after checkout — for + /// example a connection the peer has signalled it will close — without + /// having to force its `poll_ready` to error to skip reinsertion. + pub fn discard(mut self) { + self.is_closed = true; + } } impl Service for Cached @@ -491,4 +530,95 @@ mod tests { let cached = f.await.expect("call"); drop(cached); } + + #[tokio::test] + async fn test_discard_prevents_reuse() { + let (mock, mut handle) = tower_test::mock::pair(); + let mut cache = super::builder().build(mock); + handle.allow(1); + + std::future::poll_fn(|cx| cache.poll_ready(cx)) + .await + .unwrap(); + let cached = future::join(cache.call(1), async { + assert_request_eq!(handle, 1).send_response("one"); + }) + .await + .0 + .expect("call"); + + // Discarding (rather than dropping) must not return the service to the + // pool: the idle set stays empty. + cached.discard(); + assert!(cache.is_empty()); + } + + #[tokio::test] + async fn test_try_pop_idle_removes_one() { + let (mock, mut handle) = tower_test::mock::pair(); + let mut cache = super::builder().build(mock); + handle.allow(1); + + std::future::poll_fn(|cx| cache.poll_ready(cx)) + .await + .unwrap(); + let cached = future::join(cache.call(1), async { + assert_request_eq!(handle, 1).send_response("one"); + }) + .await + .0 + .expect("call"); + // Returning the checkout to the pool leaves one idle service. + drop(cached); + assert!(!cache.is_empty()); + + // The idle service is popped and the cache is left empty. + assert!(cache.try_pop_idle().is_some()); + assert!(cache.is_empty()); + // Nothing left to pop. + assert!(cache.try_pop_idle().is_none()); + } + + #[tokio::test] + async fn test_try_checkout_idle_returns_to_pool_on_drop() { + let (mock, mut handle) = tower_test::mock::pair(); + let mut cache = super::builder().build(mock); + // Only one service is ever made: a successful checkout-and-reuse must + // not start a second connection. + handle.allow(1); + + std::future::poll_fn(|cx| cache.poll_ready(cx)) + .await + .unwrap(); + let cached = future::join(cache.call(1), async { + assert_request_eq!(handle, 1).send_response("one"); + }) + .await + .0 + .expect("call"); + drop(cached); + assert!(!cache.is_empty()); + + // Checking out the idle service removes it from the idle set. + let checked_out = cache.try_checkout_idle(); + assert!(checked_out.is_some()); + assert!(cache.is_empty()); + + // Dropping the checkout returns the service to the pool (unlike + // `try_pop_idle`), so it can be checked out again — and since only one + // service was ever made, this reuses the same one. + drop(checked_out); + assert!(!cache.is_empty()); + assert!(cache.try_checkout_idle().is_some()); + } + + #[tokio::test] + async fn test_try_take_idle_on_empty_cache_is_none() { + let (mock, _handle) = tower_test::mock::pair::(); + let cache = super::builder().build(mock); + + // No idle service. Unlike `Service::call`, neither take makes a new one. + assert!(cache.try_checkout_idle().is_none()); + assert!(cache.try_pop_idle().is_none()); + } }