diff --git a/Cargo.lock b/Cargo.lock index 4a42447a83f..4961c1e3cba 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -231,6 +231,18 @@ dependencies = [ "wait-timeout", ] +[[package]] +name = "async-channel" +version = "2.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "924ed96dd52d1b75e9c1a3e6275715fd320f5f9439fb5a4a11fa51f4221158d2" +dependencies = [ + "concurrent-queue", + "event-listener-strategy", + "futures-core", + "pin-project-lite", +] + [[package]] name = "async-scoped" version = "0.9.0" @@ -1086,6 +1098,15 @@ dependencies = [ "static_assertions", ] +[[package]] +name = "concurrent-queue" +version = "2.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4ca0197aee26d1ae37445ee532fefce43251d24cc7c166799f4d46817f1d3973" +dependencies = [ + "crossbeam-utils", +] + [[package]] name = "concurrent_lru" version = "0.2.0" @@ -2106,6 +2127,27 @@ dependencies = [ "serde", ] +[[package]] +name = "event-listener" +version = "5.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e13b66accf52311f30a0db42147dadea9850cb48cd070028831ae5f5d4b856ab" +dependencies = [ + "concurrent-queue", + "parking", + "pin-project-lite", +] + +[[package]] +name = "event-listener-strategy" +version = "0.5.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8be9f3dfaaffdae2972880079a491a1a8bb7cbed0b8dd7a347f668b4150a3b93" +dependencies = [ + "event-listener", + "pin-project-lite", +] + [[package]] name = "event-table-client" version = "2.1.0" @@ -5153,6 +5195,12 @@ dependencies = [ "unicode-width 0.1.14", ] +[[package]] +name = "parking" +version = "2.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f38d5652c16fde515bb1ecef450ab0f6a219d619a7274976324d5e377f7dceba" + [[package]] name = "parking_lot" version = "0.11.2" @@ -8125,6 +8173,7 @@ name = "spacetimedb-durability" version = "2.1.0" dependencies = [ "anyhow", + "async-channel", "futures", "itertools 0.12.1", "log", diff --git a/Cargo.toml b/Cargo.toml index e8938a0dcc5..371364a9a86 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -153,6 +153,7 @@ ahash = { version = "0.8", default-features = false, features = ["std"] } anyhow = "1.0.68" anymap = "0.12" arrayvec = "0.7.2" +async-channel = "2.5" async-stream = "0.3.6" async-trait = "0.1.68" axum = { version = "0.7", features = ["tracing"] } diff --git a/crates/durability/Cargo.toml b/crates/durability/Cargo.toml index fbc2eaa9fae..0ea8022fcbe 100644 --- a/crates/durability/Cargo.toml +++ b/crates/durability/Cargo.toml @@ -13,6 +13,7 @@ fallocate = ["spacetimedb-commitlog/fallocate"] [dependencies] anyhow.workspace = true +async-channel.workspace = true futures.workspace = true itertools.workspace = true log.workspace = true diff --git a/crates/durability/src/imp/local.rs b/crates/durability/src/imp/local.rs index 43388128eeb..ca11c01c4af 100644 --- a/crates/durability/src/imp/local.rs +++ b/crates/durability/src/imp/local.rs @@ -97,7 +97,7 @@ pub struct Local { /// /// The queue is bounded to /// `Options::QUEUE_CAPACITY_MULTIPLIER * Options::batch_capacity`. - queue: mpsc::Sender>>, + queue: async_channel::Sender>>, /// How many transactions are pending durability, including items buffered /// in the queue and items currently being written by the actor. /// @@ -137,7 +137,7 @@ impl Local { on_new_segment, )?); let queue_capacity = opts.queue_capacity(); - let (queue, txdata_rx) = mpsc::channel(queue_capacity); + let (queue, txdata_rx) = async_channel::bounded(queue_capacity); let queue_depth = Arc::new(AtomicU64::new(0)); let (durable_tx, durable_rx) = watch::channel(clog.max_committed_offset()); let (shutdown_tx, shutdown_rx) = mpsc::channel(1); @@ -218,7 +218,7 @@ impl Actor { #[instrument(name = "durability::local::actor", skip_all)] async fn run( self, - mut transactions_rx: mpsc::Receiver>>, + transactions_rx: async_channel::Receiver>>, mut shutdown_rx: mpsc::Receiver>, ) { info!("starting durability actor"); @@ -244,7 +244,7 @@ impl Actor { // potentially requiring the `tx_buf` to allocate additional // capacity. // We'll reclaim capacity in excess of `self.batch_size` below. - n = transactions_rx.recv_many(&mut tx_buf, usize::MAX) => { + n = recv_many(&transactions_rx, &mut tx_buf, usize::MAX) => { if n == 0 { break; } @@ -344,29 +344,8 @@ impl Durability for Local { type TxData = Txdata; fn append_tx(&self, tx: PreparedTx) { - let mut tx = Some(tx); - let blocked = match self.queue.try_reserve() { - Ok(permit) => { - permit.send(tx.take().expect("tx already sent")); - false - } - Err(mpsc::error::TrySendError::Closed(_)) => { - panic!("durability actor crashed"); - } - Err(mpsc::error::TrySendError::Full(_)) => { - let mut send = || self.queue.blocking_send(tx.take().expect("tx already sent")); - if tokio::runtime::Handle::try_current().is_ok() { - tokio::task::block_in_place(send) - } else { - send() - } - .expect("durability actor crashed"); - true - } - }; - + self.queue.send_blocking(tx).expect("local durability: actor vanished"); self.queue_depth.fetch_add(1, Relaxed); - let _ = blocked; } fn durable_tx_offset(&self) -> DurableOffset { @@ -436,3 +415,28 @@ impl History for Commitlog> { (min, max) } } + +/// Implement tokio's `recv_many` for an `async_channel` receiver. +async fn recv_many(chan: &async_channel::Receiver, buf: &mut Vec, limit: usize) -> usize { + let mut n = 0; + if !chan.is_empty() { + buf.reserve(chan.len().min(limit)); + while n < limit { + let Ok(val) = chan.try_recv() else { + break; + }; + buf.push(val); + n += 1; + } + } + + if n == 0 { + let Ok(val) = chan.recv().await else { + return n; + }; + buf.push(val); + n += 1; + } + + n +} diff --git a/crates/durability/src/imp/local.rs.orig b/crates/durability/src/imp/local.rs.orig deleted file mode 100644 index cbfcdce2f16..00000000000 --- a/crates/durability/src/imp/local.rs.orig +++ /dev/null @@ -1,416 +0,0 @@ -use std::{ - io, - num::NonZeroUsize, - path::PathBuf, - sync::{ - atomic::{AtomicU64, Ordering::Relaxed}, - Arc, - }, -}; - -use futures::{FutureExt as _, TryFutureExt as _}; -use itertools::Itertools as _; -use log::{info, trace, warn}; -use scopeguard::ScopeGuard; -use spacetimedb_commitlog::{error, payload::Txdata, Commit, Commitlog, Decoder, Encode, Transaction}; -use spacetimedb_fs_utils::lockfile::advisory::{LockError, LockedFile}; -use spacetimedb_paths::server::ReplicaDir; -use thiserror::Error; -use tokio::{ - sync::{futures::OwnedNotified, mpsc, oneshot, watch, Notify}, - task::{spawn_blocking, AbortHandle}, -}; -use tracing::{instrument, Span}; - -use crate::{Close, Durability, DurableOffset, History, TxOffset}; - -pub use spacetimedb_commitlog::repo::{OnNewSegmentFn, SizeOnDisk}; - -/// [`Local`] configuration. -#[derive(Clone, Copy, Debug)] -pub struct Options { -<<<<<<< conflict 1 of 2 -+++++++ ssmwmsoq d6e2ba51 "durability: Flush batches (#4478)" - /// The number of elements to reserve for batching transactions. - /// - /// This puts an upper bound on the buffer capacity, while not preventing - /// reallocations when the number of queued transactions exceeds it. - /// - /// In other words, the durability actor will attempt to receive all - /// transactions that are currently in the queue, but shrink the buffer to - /// `batch_capacity` if it had to make additional space during a burst. - /// - /// Default: 4096 - pub batch_capacity: NonZeroUsize, -%%%%%%% diff from: ttnusruw 6fea15f7 "[C#] Update RawTableIterBase.Enumerator to use `ArrayPool` for buffer (#4385)" -\\\\\\\ to: ppnmwost abbcec4a "keynote-2: alpha -> 1.5, withConfirmedReads(true), remove warmup (#4492)" - /// Periodically flush and sync the log this often. - /// -- /// Default: 50ms -+ /// Default: 10ms - pub sync_interval: Duration, ->>>>>>> conflict 1 of 2 ends - /// [`Commitlog`] configuration. - pub commitlog: spacetimedb_commitlog::Options, -} - -impl Options { - pub const DEFAULT_BATCH_CAPACITY: NonZeroUsize = NonZeroUsize::new(4096).unwrap(); -} - -impl Default for Options { - fn default() -> Self { - Self { -<<<<<<< conflict 2 of 2 -+++++++ ssmwmsoq d6e2ba51 "durability: Flush batches (#4478)" - batch_capacity: Self::DEFAULT_BATCH_CAPACITY, -%%%%%%% diff from: ttnusruw 6fea15f7 "[C#] Update RawTableIterBase.Enumerator to use `ArrayPool` for buffer (#4385)" -\\\\\\\ to: ppnmwost abbcec4a "keynote-2: alpha -> 1.5, withConfirmedReads(true), remove warmup (#4492)" -- sync_interval: Duration::from_millis(50), -+ sync_interval: Duration::from_millis(10), ->>>>>>> conflict 2 of 2 ends - commitlog: Default::default(), - } - } -} - -#[derive(Debug, Error)] -pub enum OpenError { - #[error("commitlog directory is locked")] - Lock(#[from] LockError), - #[error("failed to open commitlog")] - Commitlog(#[from] io::Error), -} - -type ShutdownReply = oneshot::Sender; - -/// [`Durability`] implementation backed by a [`Commitlog`] on local storage. -/// -/// The commitlog is constrained to store the canonical [`Txdata`] payload, -/// where the generic parameter `T` is the type of the row data stored in -/// the mutations section. -/// -/// `T` is left generic in order to allow bypassing the `ProductValue` -/// intermediate representation in the future. -/// -/// Note, however, that instantiating `T` to a different type may require to -/// change the log format version! -pub struct Local { - /// The [`Commitlog`] this [`Durability`] and [`History`] impl wraps. - clog: Arc>>, - /// The durable transaction offset, as reported by the background - /// [`FlushAndSyncTask`]. - durable_offset: watch::Receiver>, - /// Backlog of transactions to be written to disk by the background - /// [`PersisterTask`]. - /// - /// Note that this is unbounded! - queue: mpsc::UnboundedSender>>, - /// How many transactions are sitting in the `queue`. - /// - /// This is mainly for observability purposes, and can thus be updated with - /// relaxed memory ordering. - queue_depth: Arc, - /// Channel to request the actor to exit. - shutdown: mpsc::Sender, - /// [AbortHandle] to force cancellation of the [Actor]. - abort: AbortHandle, -} - -impl Local { - /// Create a [`Local`] instance at the `replica_dir`. - /// - /// `replica_dir` must already exist. - /// - /// Background tasks are spawned onto the provided tokio runtime. - /// - /// We will send a message down the `on_new_segment` channel whenever we begin a new commitlog segment. - /// This is used to capture a snapshot each new segment. - pub fn open( - replica_dir: ReplicaDir, - rt: tokio::runtime::Handle, - opts: Options, - on_new_segment: Option>, - ) -> Result { - info!("open local durability"); - - // We could just place a lock on the commitlog directory, - // yet for backwards-compatibility, we keep using the `db.lock` file. - let lock = Lock::create(replica_dir.0.join("db.lock"))?; - - let clog = Arc::new(Commitlog::open( - replica_dir.commit_log(), - opts.commitlog, - on_new_segment, - )?); - let (queue, txdata_rx) = mpsc::unbounded_channel(); - let queue_depth = Arc::new(AtomicU64::new(0)); - let (durable_tx, durable_rx) = watch::channel(clog.max_committed_offset()); - let (shutdown_tx, shutdown_rx) = mpsc::channel(1); - - let abort = rt - .spawn( - Actor { - clog: clog.clone(), - - durable_offset: durable_tx, - queue_depth: queue_depth.clone(), - - batch_capacity: opts.batch_capacity, - - lock, - } - .run(txdata_rx, shutdown_rx), - ) - .abort_handle(); - - Ok(Self { - clog, - durable_offset: durable_rx, - queue, - shutdown: shutdown_tx, - queue_depth, - abort, - }) - } - - /// Obtain a read-only copy of the durable state that implements [History]. - pub fn as_history(&self) -> impl History> { - self.clog.clone() - } -} - -impl Local { - /// Inspect how many transactions added via [`Self::append_tx`] are pending - /// to be applied to the underlying [`Commitlog`]. - pub fn queue_depth(&self) -> u64 { - self.queue_depth.load(Relaxed) - } - - /// Obtain an iterator over the [`Commit`]s in the underlying log. - pub fn commits_from(&self, offset: TxOffset) -> impl Iterator> { - self.clog.commits_from(offset).map_ok(Commit::from) - } - - /// Get a list of segment offsets, sorted in ascending order. - pub fn existing_segment_offsets(&self) -> io::Result> { - self.clog.existing_segment_offsets() - } - - /// Compress the segments at the offsets provided, marking them as immutable. - pub fn compress_segments(&self, offsets: &[TxOffset]) -> io::Result<()> { - self.clog.compress_segments(offsets) - } - - /// Get the size on disk of the underlying [`Commitlog`]. - pub fn size_on_disk(&self) -> io::Result { - self.clog.size_on_disk() - } -} - -struct Actor { - clog: Arc>>, - - durable_offset: watch::Sender>, - queue_depth: Arc, - - batch_capacity: NonZeroUsize, - - #[allow(unused)] - lock: Lock, -} - -impl Actor { - #[instrument(name = "durability::local::actor", skip_all)] - async fn run( - self, - mut transactions_rx: mpsc::UnboundedReceiver>>, - mut shutdown_rx: mpsc::Receiver>, - ) { - info!("starting durability actor"); - - let mut tx_buf = Vec::with_capacity(self.batch_capacity.get()); - // `flush_and_sync` when the loop exits without panicking, - // or `flush_and_sync` inside the loop failed. - let mut sync_on_exit = true; - - loop { - tokio::select! { - // Biased towards the shutdown channel, - // so that we stop accepting new data promptly after - // `Durability::close` was called. - biased; - - Some(reply) = shutdown_rx.recv() => { - transactions_rx.close(); - let _ = reply.send(self.lock.notified()); - }, - - // Pop as many elements from the channel as possible, - // potentially requiring the `tx_buf` to allocate additional - // capacity. - // We'll reclaim capacity in excess of `self.batch_size` below. - n = transactions_rx.recv_many(&mut tx_buf, usize::MAX) => { - if n == 0 { - break; - } - self.queue_depth.fetch_sub(n as u64, Relaxed); - let clog = self.clog.clone(); - tx_buf = spawn_blocking(move || -> io::Result>>> { - for tx in tx_buf.drain(..) { - clog.commit([tx])?; - } - Ok(tx_buf) - }) - .await - .expect("commitlog write panicked") - .expect("commitlog write failed"); - if self.flush_and_sync().await.is_err() { - sync_on_exit = false; - break; - } - // Reclaim burst capacity. - tx_buf.shrink_to(self.batch_capacity.get()); - }, - } - } - - if sync_on_exit { - let _ = self.flush_and_sync().await; - } - - info!("exiting durability actor"); - } - - #[instrument(skip_all)] - async fn flush_and_sync(&self) -> io::Result> { - // Skip if nothing changed. - if let Some((committed, durable)) = self.clog.max_committed_offset().zip(*self.durable_offset.borrow()) { - if committed == durable { - return Ok(None); - } - } - - let clog = self.clog.clone(); - let span = Span::current(); - spawn_blocking(move || { - let _span = span.enter(); - clog.flush_and_sync() - }) - .await - .expect("commitlog flush-and-sync blocking task panicked") - .inspect_err(|e| warn!("error flushing commitlog: {e:#}")) - .inspect(|maybe_offset| { - if let Some(new_offset) = maybe_offset { - trace!("synced to offset {new_offset}"); - self.durable_offset.send_modify(|val| { - val.replace(*new_offset); - }); - } - }) - } -} - -struct Lock { - file: Option, - notify_on_drop: Arc, -} - -impl Lock { - pub fn create(path: PathBuf) -> Result { - let file = LockedFile::lock(path).map(Some)?; - let notify_on_drop = Arc::new(Notify::new()); - - Ok(Self { file, notify_on_drop }) - } - - pub fn notified(&self) -> OwnedNotified { - self.notify_on_drop.clone().notified_owned() - } -} - -impl Drop for Lock { - fn drop(&mut self) { - // Ensure the file lock is dropped before notifying. - if let Some(file) = self.file.take() { - drop(file); - } - self.notify_on_drop.notify_waiters(); - } -} - -impl Durability for Local { - type TxData = Txdata; - - fn append_tx(&self, tx: Transaction) { - self.queue.send(tx).expect("durability actor crashed"); - self.queue_depth.fetch_add(1, Relaxed); - } - - fn durable_tx_offset(&self) -> DurableOffset { - self.durable_offset.clone().into() - } - - fn close(&self) -> Close { - info!("close local durability"); - - let durable_offset = self.durable_tx_offset(); - let shutdown = self.shutdown.clone(); - // Abort actor if shutdown future is dropped. - let abort = scopeguard::guard(self.abort.clone(), |actor| { - warn!("close future dropped, aborting durability actor"); - actor.abort(); - }); - - async move { - let (done_tx, done_rx) = oneshot::channel(); - // Ignore channel errors - those just mean the actor is already gone. - let _ = shutdown - .send(done_tx) - .map_err(drop) - .and_then(|()| done_rx.map_err(drop)) - .and_then(|done| async move { - done.await; - Ok(()) - }) - .await; - // Don't abort if we completed normally. - let _ = ScopeGuard::into_inner(abort); - - durable_offset.last_seen() - } - .boxed() - } -} - -impl History for Commitlog> { - type TxData = Txdata; - - fn fold_transactions_from(&self, offset: TxOffset, decoder: D) -> Result<(), D::Error> - where - D: Decoder, - D::Error: From, - { - self.fold_transactions_from(offset, decoder) - } - - fn transactions_from<'a, D>( - &self, - offset: TxOffset, - decoder: &'a D, - ) -> impl Iterator, D::Error>> - where - D: Decoder, - D::Error: From, - Self::TxData: 'a, - { - self.transactions_from(offset, decoder) - } - - fn tx_range_hint(&self) -> (TxOffset, Option) { - let min = self.min_committed_offset().unwrap_or_default(); - let max = self.max_committed_offset(); - - (min, max) - } -} diff --git a/crates/durability/tests/io/fallocate.rs b/crates/durability/tests/io/fallocate.rs index 05695e20302..ae8071a53bd 100644 --- a/crates/durability/tests/io/fallocate.rs +++ b/crates/durability/tests/io/fallocate.rs @@ -73,7 +73,7 @@ async fn local_durability_cannot_be_created_if_not_enough_space() -> anyhow::Res // In reality, `append_tx` will fail at some point in the future. // I.e. transactions can be lost when the host runs out of disk space. #[tokio::test(flavor = "multi_thread")] -#[should_panic = "durability actor crashed"] +#[should_panic = "local durability: actor vanished"] async fn local_durability_crashes_on_new_segment_if_not_enough_space() { enable_logging();