Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
130 changes: 130 additions & 0 deletions src/client/pool/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,34 @@ mod internal {
pub fn is_empty(&self) -> bool {
self.shared.lock().unwrap().services.is_empty()
}

/// Remove and return one idle cached service, if any.
///
/// Unlike [`Service::call`], which wraps the taken service in a
/// [`Cached`] that returns to the pool on drop, this hands back the
/// raw service with no return-to-pool wrapper: dropping it drops the
/// service outright. Use it to release the resource an idle service
/// holds proactively, rather than waiting for it to be handed out
/// again or evicted by a [`Self::retain`] pass. Serialized against
/// `retain` by the shared lock, so a popped service is removed before
/// a retain pass can observe it.
pub fn try_pop_idle(&self) -> Option<M::Response> {
self.shared.lock().unwrap().services.pop()
}

/// Take one idle cached service, if any, wrapped so it returns to the
/// pool on drop.
///
/// Unlike [`Self::try_pop_idle`], which hands back the raw service,
/// this returns the same [`Cached`] wrapper [`Service::call`]
/// produces: dropping it re-inserts the service into the pool. Unlike
/// [`Service::call`], it never makes a new service — it returns `None`
/// when no service is idle. Serialized against [`Self::retain`] and
/// [`Self::try_pop_idle`] by the shared lock.
pub fn try_checkout_idle(&self) -> Option<Cached<M::Response>> {
let inner = self.shared.lock().unwrap().services.pop()?;
Some(Cached::new(inner, Arc::downgrade(&self.shared)))
}
}

impl<M, Dst, Ev> Service<Dst> for Cache<M, Dst, Ev>
Expand Down Expand Up @@ -313,6 +341,17 @@ mod internal {
pub fn inner_mut(&mut self) -> &mut S {
self.inner.as_mut().expect("inner only taken in drop")
}

/// Prevent this cached service from being returned to the pool.
///
/// Consumes `self`; the inner service is dropped without reinsertion,
/// regardless of whether it is still healthy. Use it when the caller
/// has learned the service should not be reused after checkout — for
/// example a connection the peer has signalled it will close — without
/// having to force its `poll_ready` to error to skip reinsertion.
pub fn discard(mut self) {
self.is_closed = true;
}
}

impl<S, Req> Service<Req> for Cached<S>
Expand Down Expand Up @@ -491,4 +530,95 @@ mod tests {
let cached = f.await.expect("call");
drop(cached);
}

#[tokio::test]
async fn test_discard_prevents_reuse() {
let (mock, mut handle) = tower_test::mock::pair();
let mut cache = super::builder().build(mock);
handle.allow(1);

std::future::poll_fn(|cx| cache.poll_ready(cx))
.await
.unwrap();
let cached = future::join(cache.call(1), async {
assert_request_eq!(handle, 1).send_response("one");
})
.await
.0
.expect("call");

// Discarding (rather than dropping) must not return the service to the
// pool: the idle set stays empty.
cached.discard();
assert!(cache.is_empty());
}

#[tokio::test]
async fn test_try_pop_idle_removes_one() {
let (mock, mut handle) = tower_test::mock::pair();
let mut cache = super::builder().build(mock);
handle.allow(1);

std::future::poll_fn(|cx| cache.poll_ready(cx))
.await
.unwrap();
let cached = future::join(cache.call(1), async {
assert_request_eq!(handle, 1).send_response("one");
})
.await
.0
.expect("call");
// Returning the checkout to the pool leaves one idle service.
drop(cached);
assert!(!cache.is_empty());

// The idle service is popped and the cache is left empty.
assert!(cache.try_pop_idle().is_some());
assert!(cache.is_empty());
// Nothing left to pop.
assert!(cache.try_pop_idle().is_none());
}

#[tokio::test]
async fn test_try_checkout_idle_returns_to_pool_on_drop() {
let (mock, mut handle) = tower_test::mock::pair();
let mut cache = super::builder().build(mock);
// Only one service is ever made: a successful checkout-and-reuse must
// not start a second connection.
handle.allow(1);

std::future::poll_fn(|cx| cache.poll_ready(cx))
.await
.unwrap();
let cached = future::join(cache.call(1), async {
assert_request_eq!(handle, 1).send_response("one");
})
.await
.0
.expect("call");
drop(cached);
assert!(!cache.is_empty());

// Checking out the idle service removes it from the idle set.
let checked_out = cache.try_checkout_idle();
assert!(checked_out.is_some());
assert!(cache.is_empty());

// Dropping the checkout returns the service to the pool (unlike
// `try_pop_idle`), so it can be checked out again — and since only one
// service was ever made, this reuses the same one.
drop(checked_out);
assert!(!cache.is_empty());
assert!(cache.try_checkout_idle().is_some());
}

#[tokio::test]
async fn test_try_take_idle_on_empty_cache_is_none() {
let (mock, _handle) = tower_test::mock::pair::<i32, &'static str>();
let cache = super::builder().build(mock);

// No idle service. Unlike `Service::call`, neither take makes a new one.
assert!(cache.try_checkout_idle().is_none());
assert!(cache.try_pop_idle().is_none());
}
}