From 89cc10d43493dc1a363686ade8d1c178ed02da52 Mon Sep 17 00:00:00 2001 From: FUJI Goro Date: Sat, 30 May 2026 16:46:38 +0900 Subject: [PATCH] Reclaim host stream/future transmits when the guest drops its end When the guest drops its end of a stream/future while the host consumer/producer is still `HostReady`, the host end was never finalized, so the `TransmitState` and both handles leaked from the concurrent-state table (eventually trapping with "resource table has no free keys"). The `HostReady` arms of `host_drop_reader` and `host_drop_writer` were no-ops; both now `delete_transmit`. `host_drop_writer` only finalizes once the writer is actually `Dropped`. Adds two `component-async-tests` regression tests (one per path) that fail on `main` and pass here. Fixes #13514. Assisted-by: Claude Code --- .../tests/scenario/streams.rs | 117 ++++++++++++++++++ .../component-async-tests/tests/test_all.rs | 3 +- .../misc/component-async-tests/wit/test.wit | 12 ++ .../src/bin/async_host_consumer_drop.rs | 32 +++++ .../concurrent/futures_and_streams.rs | 31 ++++- 5 files changed, 192 insertions(+), 3 deletions(-) create mode 100644 crates/test-programs/src/bin/async_host_consumer_drop.rs diff --git a/crates/misc/component-async-tests/tests/scenario/streams.rs b/crates/misc/component-async-tests/tests/scenario/streams.rs index f54c95ea15eb..5c6f29be3426 100644 --- a/crates/misc/component-async-tests/tests/scenario/streams.rs +++ b/crates/misc/component-async-tests/tests/scenario/streams.rs @@ -303,6 +303,123 @@ pub async fn async_closed_stream() -> Result<()> { .await? } +mod host_consumer_drop { + wasmtime::component::bindgen!({ + path: "wit", + world: "host-consumer-drop-guest", + exports: { default: store | async }, + }); +} + +// Regression test: a host *consumer* registered via `StreamReader::pipe` must be +// finalized when the guest drops the writable end *after* the consumer is +// attached. The guest hands the host the readable end, keeps the writable end, +// writes one byte once the consumer reads, then drops the writer. That reaches +// `host_drop_writer` with the read side in `ReadState::HostReady`, which must +// reclaim the transmit rather than leaving it stranded. +#[tokio::test] +pub async fn async_host_consumer_drop() -> Result<()> { + let engine = Engine::new(&config())?; + + let component = make_component( + &engine, + &[test_programs_artifacts::ASYNC_HOST_CONSUMER_DROP_COMPONENT], + ) + .await?; + + let mut linker = Linker::new(&engine); + + wasmtime_wasi::p2::add_to_linker_async(&mut linker)?; + + let mut store = Store::new( + &engine, + Ctx { + wasi: WasiCtxBuilder::new().inherit_stdio().build(), + table: ResourceTable::default(), + continue_: false, + }, + ); + + let instance = linker.instantiate_async(&mut store, &component).await?; + let guest = host_consumer_drop::HostConsumerDropGuest::new(&mut store, &instance)?; + store + .run_concurrent(async move |accessor| { + let stream = guest + .local_local_host_consumer_drop() + .call_get(accessor) + .await?; + + let (tx, mut rx) = mpsc::channel(1); + accessor.with(move |store| stream.pipe(store, PipeConsumer::new(tx)))?; + assert_eq!(rx.next().await, Some(42)); + assert!(rx.next().await.is_none()); + + wasmtime::error::Ok(()) + }) + .await??; + + // The host consumer and both transmit handles must be gone now that the + // guest dropped its end. + store.assert_concurrent_state_empty(); + + Ok(()) +} + +// Regression test: the symmetric host *producer* case. The host hands the guest +// a `future` via `FutureReader::new` and the guest drops the read end without +// reading it. That reaches `host_drop_reader` with the write side in +// `WriteState::HostReady`, which must reclaim the transmit. The guest's +// `read_future` reads `rx` but drops `rx_ignored`. +#[tokio::test] +pub async fn async_host_producer_drop() -> Result<()> { + let engine = Engine::new(&config())?; + + let component = make_component( + &engine, + &[test_programs_artifacts::ASYNC_CLOSED_STREAMS_COMPONENT], + ) + .await?; + + let mut linker = Linker::new(&engine); + + wasmtime_wasi::p2::add_to_linker_async(&mut linker)?; + + let mut store = Store::new( + &engine, + Ctx { + wasi: WasiCtxBuilder::new().inherit_stdio().build(), + table: ResourceTable::default(), + continue_: false, + }, + ); + + let instance = linker.instantiate_async(&mut store, &component).await?; + + let value = 42_u8; + let (tx, rx) = oneshot::channel(); + let rx = FutureReader::new(&mut store, OneshotProducer::new(rx))?; + let (_, rx_ignored) = oneshot::channel(); + let rx_ignored = FutureReader::new(&mut store, OneshotProducer::new(rx_ignored))?; + + let closed_streams = closed_streams::bindings::ClosedStreams::new(&mut store, &instance)?; + + store + .run_concurrent(async move |accessor| { + _ = tx.send(value); + closed_streams + .local_local_closed() + .call_read_future(accessor, rx, value, rx_ignored) + .await + }) + .await??; + + // The host producer behind `rx_ignored` and both transmit handles must be + // gone now that the guest dropped the read end without reading. + store.assert_concurrent_state_empty(); + + Ok(()) +} + #[tokio::test] pub async fn async_cross_instance_source() -> Result<()> { let engine = Engine::new(&config())?; diff --git a/crates/misc/component-async-tests/tests/test_all.rs b/crates/misc/component-async-tests/tests/test_all.rs index 92ccf5ae77d4..7b3e6ce64010 100644 --- a/crates/misc/component-async-tests/tests/test_all.rs +++ b/crates/misc/component-async-tests/tests/test_all.rs @@ -31,7 +31,8 @@ use scenario::round_trip_many::{ async_round_trip_many_synchronous, async_round_trip_many_wait, }; use scenario::streams::{ - async_closed_stream, async_closed_streams, async_cross_instance_source, async_short_reads, + async_closed_stream, async_closed_streams, async_cross_instance_source, + async_host_consumer_drop, async_short_reads, }; use scenario::transmit::{ async_cancel_callee, async_cancel_caller, async_cancel_transmit, async_intertask_communication, diff --git a/crates/misc/component-async-tests/wit/test.wit b/crates/misc/component-async-tests/wit/test.wit index 84efb8ec40a0..6a0ac6817cbd 100644 --- a/crates/misc/component-async-tests/wit/test.wit +++ b/crates/misc/component-async-tests/wit/test.wit @@ -187,6 +187,14 @@ interface closed-stream { get: func() -> stream; } +interface host-consumer-drop { + // Returns the readable end of a fresh stream while the guest keeps the + // writable end. Once a consumer is attached the guest writes one byte and + // then drops the writer, so the host consumer observes a clean close while + // its read side is still `HostReady`. + get: async func() -> stream; +} + interface short-reads { resource thing { constructor(s: string); @@ -369,6 +377,10 @@ world closed-stream-guest { export closed-stream; } +world host-consumer-drop-guest { + export host-consumer-drop; +} + world short-reads-guest { export short-reads; } diff --git a/crates/test-programs/src/bin/async_host_consumer_drop.rs b/crates/test-programs/src/bin/async_host_consumer_drop.rs new file mode 100644 index 000000000000..598c527b8093 --- /dev/null +++ b/crates/test-programs/src/bin/async_host_consumer_drop.rs @@ -0,0 +1,32 @@ +mod bindings { + wit_bindgen::generate!({ + path: "../misc/component-async-tests/wit", + world: "host-consumer-drop-guest", + async: true, + }); + + use super::Component; + export!(Component); +} + +use {bindings::exports::local::local::host_consumer_drop::Guest, wit_bindgen::StreamReader}; + +struct Component; + +impl Guest for Component { + async fn get() -> StreamReader { + let (mut tx, rx) = bindings::wit_stream::new(); + // Keep the writable end and hand the readable end to the host. The host + // attaches a consumer (read side -> `HostReady`); the write below blocks + // until that consumer reads, after which we drop the writer. Dropping it + // while the consumer is still `HostReady` is the path that used to leak. + wit_bindgen::spawn(async move { + assert!(tx.write_one(42).await.is_none()); + drop(tx); + }); + rx + } +} + +// Unused function; required since this file is built as a `bin`: +fn main() {} diff --git a/crates/wasmtime/src/runtime/component/concurrent/futures_and_streams.rs b/crates/wasmtime/src/runtime/component/concurrent/futures_and_streams.rs index d1b5510ee214..c369efab5f74 100644 --- a/crates/wasmtime/src/runtime/component/concurrent/futures_and_streams.rs +++ b/crates/wasmtime/src/runtime/component/concurrent/futures_and_streams.rs @@ -2471,7 +2471,18 @@ impl StoreOpaque { )?; } - WriteState::HostReady { .. } => {} + WriteState::HostReady { .. } => { + // A host producer (e.g. one installed via `FutureReader::new`) + // is only driven when a reader pulls from it; it is never + // re-polled to observe the guest dropping the read end. The read + // end is already `Dropped` (set at the top of this function), so + // the produced value can never be consumed. Reclaim the transmit + // (state + both handles) here; otherwise it would leak for the + // lifetime of the instance. The producer is dropped along with + // the matched `HostReady` value. + log::trace!("host_drop_reader: finalize host producer, delete {transmit_id:?}"); + state.delete_transmit(transmit_id)?; + } WriteState::Open => { state.update_event( @@ -2570,7 +2581,23 @@ impl StoreOpaque { )?; } - ReadState::HostReady { .. } | ReadState::HostToHost { .. } => {} + ReadState::HostReady { .. } | ReadState::HostToHost { .. } => { + // A host consumer (e.g. one registered via `StreamReader::pipe`) + // is only driven on guest writes; it is never re-polled to + // observe the guest dropping the write end. Reclaim the transmit + // (state + both handles) so it does not leak. Unlike + // `host_drop_reader`, the write end is not forced to `Dropped` + // earlier in this function, so only finalize once the writer is + // actually gone -- otherwise we would discard a still-live host + // writer. The consumer is dropped along with the matched value. + if matches!( + self.concurrent_state_mut().get_mut(transmit_id)?.write, + WriteState::Dropped + ) { + log::trace!("host_drop_writer: finalize host consumer, delete {transmit_id:?}"); + self.concurrent_state_mut().delete_transmit(transmit_id)?; + } + } // If the read state is open, then there are no registered readers of the stream/future ReadState::Open => {