Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 49 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,7 @@ ahash = { version = "0.8", default-features = false, features = ["std"] }
anyhow = "1.0.68"
anymap = "0.12"
arrayvec = "0.7.2"
async-channel = "2.5"
async-stream = "0.3.6"
async-trait = "0.1.68"
axum = { version = "0.7", features = ["tracing"] }
Expand Down
1 change: 1 addition & 0 deletions crates/durability/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ fallocate = ["spacetimedb-commitlog/fallocate"]

[dependencies]
anyhow.workspace = true
async-channel.workspace = true
futures.workspace = true
itertools.workspace = true
log.workspace = true
Expand Down
56 changes: 30 additions & 26 deletions crates/durability/src/imp/local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ pub struct Local<T> {
///
/// The queue is bounded to
/// `Options::QUEUE_CAPACITY_MULTIPLIER * Options::batch_capacity`.
queue: mpsc::Sender<PreparedTx<Txdata<T>>>,
queue: async_channel::Sender<PreparedTx<Txdata<T>>>,
/// How many transactions are pending durability, including items buffered
/// in the queue and items currently being written by the actor.
///
Expand Down Expand Up @@ -137,7 +137,7 @@ impl<T: Encode + Send + Sync + 'static> Local<T> {
on_new_segment,
)?);
let queue_capacity = opts.queue_capacity();
let (queue, txdata_rx) = mpsc::channel(queue_capacity);
let (queue, txdata_rx) = async_channel::bounded(queue_capacity);
let queue_depth = Arc::new(AtomicU64::new(0));
let (durable_tx, durable_rx) = watch::channel(clog.max_committed_offset());
let (shutdown_tx, shutdown_rx) = mpsc::channel(1);
Expand Down Expand Up @@ -218,7 +218,7 @@ impl<T: Encode + Send + Sync + 'static> Actor<T> {
#[instrument(name = "durability::local::actor", skip_all)]
async fn run(
self,
mut transactions_rx: mpsc::Receiver<PreparedTx<Txdata<T>>>,
transactions_rx: async_channel::Receiver<PreparedTx<Txdata<T>>>,
mut shutdown_rx: mpsc::Receiver<oneshot::Sender<OwnedNotified>>,
) {
info!("starting durability actor");
Expand All @@ -244,7 +244,7 @@ impl<T: Encode + Send + Sync + 'static> Actor<T> {
// potentially requiring the `tx_buf` to allocate additional
// capacity.
// We'll reclaim capacity in excess of `self.batch_size` below.
n = transactions_rx.recv_many(&mut tx_buf, usize::MAX) => {
n = recv_many(&transactions_rx, &mut tx_buf, usize::MAX) => {
if n == 0 {
break;
}
Expand Down Expand Up @@ -344,29 +344,8 @@ impl<T: Send + Sync + 'static> Durability for Local<T> {
type TxData = Txdata<T>;

fn append_tx(&self, tx: PreparedTx<Self::TxData>) {
let mut tx = Some(tx);
let blocked = match self.queue.try_reserve() {
Ok(permit) => {
permit.send(tx.take().expect("tx already sent"));
false
}
Err(mpsc::error::TrySendError::Closed(_)) => {
panic!("durability actor crashed");
}
Err(mpsc::error::TrySendError::Full(_)) => {
let mut send = || self.queue.blocking_send(tx.take().expect("tx already sent"));
if tokio::runtime::Handle::try_current().is_ok() {
tokio::task::block_in_place(send)
} else {
send()
}
.expect("durability actor crashed");
true
}
};

self.queue.send_blocking(tx).expect("local durability: actor vanished");
self.queue_depth.fetch_add(1, Relaxed);
let _ = blocked;
}

fn durable_tx_offset(&self) -> DurableOffset {
Expand Down Expand Up @@ -436,3 +415,28 @@ impl<T: Encode + 'static> History for Commitlog<Txdata<T>> {
(min, max)
}
}

/// Implement tokio's `recv_many` for an `async_channel` receiver.
async fn recv_many<T>(chan: &async_channel::Receiver<T>, buf: &mut Vec<T>, limit: usize) -> usize {
let mut n = 0;
if !chan.is_empty() {
buf.reserve(chan.len().min(limit));
while n < limit {
let Ok(val) = chan.try_recv() else {
break;
};
buf.push(val);
n += 1;
}
}

if n == 0 {
let Ok(val) = chan.recv().await else {
return n;
};
buf.push(val);
n += 1;
}

n
}
Loading
Loading