From d5bfdc8bf3785960d402094c76e528cd5db5fa55 Mon Sep 17 00:00:00 2001 From: "gulbrand.dev" Date: Sun, 31 May 2026 21:57:53 -0700 Subject: [PATCH] feat: add duplex stream API (interface only; backends to follow) Introduce the public duplex stream contract without any backend implementation. Hosts inherit trait defaults: supports_duplex() returns false and build_duplex_stream_raw returns UnsupportedOperation. --- CHANGELOG.md | 1 + src/duplex.rs | 47 ++++++++++++++++++ src/lib.rs | 2 + src/platform/mod.rs | 40 +++++++++++++++ src/traits.rs | 118 ++++++++++++++++++++++++++++++++++++++++++-- 5 files changed, 204 insertions(+), 4 deletions(-) create mode 100644 src/duplex.rs diff --git a/CHANGELOG.md b/CHANGELOG.md index 53927a27c..f6ceb6fd0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -19,6 +19,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - `realtime` feature for real-time audio thread scheduling without a D-Bus build dependency. - `StreamTrait::now()` to query the current instant on the stream's clock. - `StreamTrait::buffer_size()` to query the stream's current buffer size in frames per callback. +- Added duplex stream API (interface only; backends to follow). - `SAMPLE_RATE_CD` (44100 Hz) and `SAMPLE_RATE_48K` (48000 Hz) constants. - `SupportedStreamConfigRange::try_with_standard_sample_rate()` and `with_standard_sample_rate()` to select 48 kHz or 44.1 kHz from a range. diff --git a/src/duplex.rs b/src/duplex.rs new file mode 100644 index 000000000..7ed62b521 --- /dev/null +++ b/src/duplex.rs @@ -0,0 +1,47 @@ +use crate::{ChannelCount, InputStreamTimestamp, OutputStreamTimestamp, SampleRate}; + +/// Information relevant to a single call to the user's duplex stream data callback. +/// +/// Combines the input and output timestamps for the callback. Because a duplex stream's input and +/// output share a single clock, both timestamps are drawn from the same time source. +#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)] +pub struct DuplexCallbackInfo { + input_timestamp: InputStreamTimestamp, + output_timestamp: OutputStreamTimestamp, +} + +impl DuplexCallbackInfo { + /// Construct a `DuplexCallbackInfo` from its input and output timestamps. + pub fn new( + input_timestamp: InputStreamTimestamp, + output_timestamp: OutputStreamTimestamp, + ) -> Self { + Self { + input_timestamp, + output_timestamp, + } + } + + /// The timestamp for the captured input data passed to the callback. + pub fn input_timestamp(&self) -> InputStreamTimestamp { + self.input_timestamp + } + + /// The timestamp for the output data written by the callback. + pub fn output_timestamp(&self) -> OutputStreamTimestamp { + self.output_timestamp + } +} + +/// The configuration shared by both directions of a duplex stream. +#[derive(Clone, Copy, Debug, Eq, PartialEq)] +pub struct DuplexStreamConfig { + /// The number of input (capture) channels. + pub input_channels: ChannelCount, + /// The number of output (playback) channels. + pub output_channels: ChannelCount, + /// The sample rate driving both directions. + pub sample_rate: SampleRate, + /// The desired buffer size, in frames per callback. + pub buffer_size: crate::BufferSize, +} diff --git a/src/lib.rs b/src/lib.rs index cf11e08eb..58182a1d8 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -184,6 +184,7 @@ pub use sample_format::{FromSample, Sample, SampleFormat, SizedSample, I24, U24} use wasm_bindgen::prelude::*; pub mod device_description; +mod duplex; mod error; mod host; pub mod platform; @@ -489,6 +490,7 @@ pub struct Data { sample_format: SampleFormat, } +pub use duplex::{DuplexCallbackInfo, DuplexStreamConfig}; pub use timestamp::{ InputCallbackInfo, InputStreamTimestamp, OutputCallbackInfo, OutputStreamTimestamp, StreamInstant, diff --git a/src/platform/mod.rs b/src/platform/mod.rs index 83f2b32d1..9f86304a0 100644 --- a/src/platform/mod.rs +++ b/src/platform/mod.rs @@ -525,6 +525,46 @@ macro_rules! impl_platform_host { )* } } + + fn supports_duplex(&self) -> bool { + match self.0 { + $( + $(#[cfg($feat)])? + DeviceInner::$HostVariant(ref d) => d.supports_duplex(), + )* + } + } + + fn build_duplex_stream_raw( + &self, + config: crate::DuplexStreamConfig, + sample_format: crate::SampleFormat, + data_callback: D, + error_callback: E, + timeout: Option, + ) -> Result + where + D: FnMut(&crate::Data, &mut crate::Data, &crate::DuplexCallbackInfo) + + Send + + 'static, + E: FnMut(crate::Error) + Send + 'static, + { + match self.0 { + $( + $(#[cfg($feat)])? + DeviceInner::$HostVariant(ref d) => d + .build_duplex_stream_raw( + config, + sample_format, + data_callback, + error_callback, + timeout, + ) + .map(StreamInner::$HostVariant) + .map(Stream::from), + )* + } + } } impl crate::traits::HostTrait for Host { diff --git a/src/traits.rs b/src/traits.rs index 4491ad979..6d71afa6f 100644 --- a/src/traits.rs +++ b/src/traits.rs @@ -12,9 +12,9 @@ use std::{ }; use crate::{ - Data, DeviceDescription, DeviceId, Error, InputCallbackInfo, InputDevices, OutputCallbackInfo, - OutputDevices, SampleFormat, SizedSample, StreamConfig, StreamInstant, SupportedStreamConfig, - SupportedStreamConfigRange, + Data, DeviceDescription, DeviceId, DuplexCallbackInfo, DuplexStreamConfig, Error, ErrorKind, + InputCallbackInfo, InputDevices, OutputCallbackInfo, OutputDevices, SampleFormat, SizedSample, + StreamConfig, StreamInstant, SupportedStreamConfig, SupportedStreamConfigRange, }; /// A [`Host`] provides access to the available audio devices on the system. @@ -114,10 +114,12 @@ pub trait DeviceTrait: PartialEq + Eq + Hash + Debug + Display { type SupportedInputConfigs: Iterator; /// The iterator type yielding supported output stream formats. type SupportedOutputConfigs: Iterator; - /// The stream type created by [`build_input_stream_raw`] and [`build_output_stream_raw`]. + /// The stream type created by [`build_input_stream_raw`] and [`build_output_stream_raw`], + /// and [`build_duplex_stream_raw`]. /// /// [`build_input_stream_raw`]: Self::build_input_stream_raw /// [`build_output_stream_raw`]: Self::build_output_stream_raw + /// [`build_duplex_stream_raw`]: Self::build_duplex_stream_raw type Stream: StreamTrait; /// Structured description of the device with metadata. @@ -160,6 +162,18 @@ pub trait DeviceTrait: PartialEq + Eq + Hash + Debug + Display { .is_ok_and(|mut iter| iter.next().is_some()) } + /// True if the device can build a synchronized duplex stream where the captured input and + /// rendered output share a single clock. + /// + /// Returning `true` is a contract that input and output sides will run from one device-level + /// callback, or an OS driver aggregate (such as an Aggregate Device on MacOS). + /// + /// The default implementation returns `false`; hosts that can guarantee a shared clock should + /// override. + fn supports_duplex(&self) -> bool { + false + } + /// An iterator yielding input stream configurations that are supported by the device. /// /// # Errors @@ -407,6 +421,102 @@ pub trait DeviceTrait: PartialEq + Eq + Hash + Debug + Display { where D: FnMut(&mut Data, &OutputCallbackInfo) + Send + 'static, E: FnMut(Error) + Send + 'static; + + /// Create a synchronized duplex stream whose input and output share the same clock + /// or OS provided bidirectional aggregate device (MacOS). MacOS Aggregate device drift + /// compensation is not required. + /// + /// # Parameters + /// + /// * `config` - Channels, sample rate, and buffer size shared by both directions. + /// * `data_callback` - Called periodically with captured input and a mutable output buffer. + /// * `error_callback` - Called when a stream error occurs (e.g., device disconnected). + /// * `timeout` - Time to wait for the backend to initialize the stream. `None` waits + /// indefinitely. Note: not all backends honor this value. + /// + /// # Errors + /// + /// - [`ErrorKind::UnsupportedOperation`] if the host or device does not support duplex + /// streams. + /// - [`ErrorKind::UnsupportedConfig`] if the sample rate, channel counts, buffer size, or + /// sample format is not supported by the device. + /// - [`ErrorKind::DeviceNotAvailable`] if the device has been disconnected. + /// - [`ErrorKind::DeviceBusy`] if the device is temporarily in use by another application. + /// - [`ErrorKind::PermissionDenied`] if the process lacks permission to access the device + /// (e.g. microphone access on macOS). + /// - [`ErrorKind::InvalidInput`] if the configuration parameters are invalid. + /// - [`ErrorKind::StreamInvalidated`] if the device's sample rate or buffer size changed + /// during stream creation, or an internal lock was poisoned. + /// - [`ErrorKind::ResourceExhausted`] if the host fails to spawn an internal monitoring + /// thread. + /// - [`ErrorKind::BackendError`] for unclassified backend failures. + /// + /// [`ErrorKind::UnsupportedOperation`]: crate::ErrorKind::UnsupportedOperation + /// [`ErrorKind::UnsupportedConfig`]: crate::ErrorKind::UnsupportedConfig + /// [`ErrorKind::DeviceNotAvailable`]: crate::ErrorKind::DeviceNotAvailable + /// [`ErrorKind::DeviceBusy`]: crate::ErrorKind::DeviceBusy + /// [`ErrorKind::PermissionDenied`]: crate::ErrorKind::PermissionDenied + /// [`ErrorKind::InvalidInput`]: crate::ErrorKind::InvalidInput + /// [`ErrorKind::StreamInvalidated`]: crate::ErrorKind::StreamInvalidated + /// [`ErrorKind::ResourceExhausted`]: crate::ErrorKind::ResourceExhausted + /// [`ErrorKind::BackendError`]: crate::ErrorKind::BackendError + fn build_duplex_stream( + &self, + config: DuplexStreamConfig, + mut data_callback: D, + error_callback: E, + timeout: Option, + ) -> Result + where + T: SizedSample, + D: FnMut(&[T], &mut [T], &DuplexCallbackInfo) + Send + 'static, + E: FnMut(Error) + Send + 'static, + { + self.build_duplex_stream_raw( + config, + T::FORMAT, + move |input, output, info| { + data_callback( + input + .as_slice() + .expect("host supplied incorrect sample type"), + output + .as_slice_mut() + .expect("host supplied incorrect sample type"), + info, + ) + }, + error_callback, + timeout, + ) + } + + /// Create a dynamically typed synchronized duplex stream. + /// + /// Hosts that support duplex streams must override this method; + /// the default implementation returns [`ErrorKind::UnsupportedOperation`]. + /// + /// See [`build_duplex_stream`](Self::build_duplex_stream) for parameter and error + /// documentation. + /// + /// [`ErrorKind::UnsupportedOperation`]: crate::ErrorKind::UnsupportedOperation + fn build_duplex_stream_raw( + &self, + _config: DuplexStreamConfig, + _sample_format: SampleFormat, + _data_callback: D, + _error_callback: E, + _timeout: Option, + ) -> Result + where + D: FnMut(&Data, &mut Data, &DuplexCallbackInfo) + Send + 'static, + E: FnMut(Error) + Send + 'static, + { + Err(Error::with_message( + ErrorKind::UnsupportedOperation, + "duplex streams are not supported by this host", + )) + } } /// A stream created from [`Device`](DeviceTrait), with methods to control it.