durability: Use async-channel to allow blocking send (#4802)

The previous approaches would either:

- panic when the queue becomes full, as `append_tx` is run inside the
context of a `LocalSet`, which is basically a glorified current thread
runtime
- deadlock because the receiver runtime has no way of notifiying the
sender of freed capacity in the channel

`async-channel` handles wait queues and notifications internally, so can
be used freely from either blocking or async contexts.

This _may_ come at different performance characteristics, but I haven't
measured them.

---------

Co-authored-by: joshua-spacetime <josh@clockworklabs.io>
This commit is contained in:
Kim Altintop
2026-04-14 22:12:51 +02:00
committed by GitHub
parent 7b3bc01d68
commit 2b3aa5ae26
6 changed files with 82 additions and 443 deletions
Generated
+49
View File
@@ -231,6 +231,18 @@ dependencies = [
"wait-timeout",
]
[[package]]
name = "async-channel"
version = "2.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "924ed96dd52d1b75e9c1a3e6275715fd320f5f9439fb5a4a11fa51f4221158d2"
dependencies = [
"concurrent-queue",
"event-listener-strategy",
"futures-core",
"pin-project-lite",
]
[[package]]
name = "async-scoped"
version = "0.9.0"
@@ -1086,6 +1098,15 @@ dependencies = [
"static_assertions",
]
[[package]]
name = "concurrent-queue"
version = "2.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4ca0197aee26d1ae37445ee532fefce43251d24cc7c166799f4d46817f1d3973"
dependencies = [
"crossbeam-utils",
]
[[package]]
name = "concurrent_lru"
version = "0.2.0"
@@ -2106,6 +2127,27 @@ dependencies = [
"serde",
]
[[package]]
name = "event-listener"
version = "5.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e13b66accf52311f30a0db42147dadea9850cb48cd070028831ae5f5d4b856ab"
dependencies = [
"concurrent-queue",
"parking",
"pin-project-lite",
]
[[package]]
name = "event-listener-strategy"
version = "0.5.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8be9f3dfaaffdae2972880079a491a1a8bb7cbed0b8dd7a347f668b4150a3b93"
dependencies = [
"event-listener",
"pin-project-lite",
]
[[package]]
name = "event-table-client"
version = "2.1.0"
@@ -5153,6 +5195,12 @@ dependencies = [
"unicode-width 0.1.14",
]
[[package]]
name = "parking"
version = "2.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f38d5652c16fde515bb1ecef450ab0f6a219d619a7274976324d5e377f7dceba"
[[package]]
name = "parking_lot"
version = "0.11.2"
@@ -8125,6 +8173,7 @@ name = "spacetimedb-durability"
version = "2.1.0"
dependencies = [
"anyhow",
"async-channel",
"futures",
"itertools 0.12.1",
"log",
+1
View File
@@ -153,6 +153,7 @@ ahash = { version = "0.8", default-features = false, features = ["std"] }
anyhow = "1.0.68"
anymap = "0.12"
arrayvec = "0.7.2"
async-channel = "2.5"
async-stream = "0.3.6"
async-trait = "0.1.68"
axum = { version = "0.7", features = ["tracing"] }
+1
View File
@@ -13,6 +13,7 @@ fallocate = ["spacetimedb-commitlog/fallocate"]
[dependencies]
anyhow.workspace = true
async-channel.workspace = true
futures.workspace = true
itertools.workspace = true
log.workspace = true
+30 -26
View File
@@ -97,7 +97,7 @@ pub struct Local<T> {
///
/// The queue is bounded to
/// `Options::QUEUE_CAPACITY_MULTIPLIER * Options::batch_capacity`.
queue: mpsc::Sender<PreparedTx<Txdata<T>>>,
queue: async_channel::Sender<PreparedTx<Txdata<T>>>,
/// How many transactions are pending durability, including items buffered
/// in the queue and items currently being written by the actor.
///
@@ -137,7 +137,7 @@ impl<T: Encode + Send + Sync + 'static> Local<T> {
on_new_segment,
)?);
let queue_capacity = opts.queue_capacity();
let (queue, txdata_rx) = mpsc::channel(queue_capacity);
let (queue, txdata_rx) = async_channel::bounded(queue_capacity);
let queue_depth = Arc::new(AtomicU64::new(0));
let (durable_tx, durable_rx) = watch::channel(clog.max_committed_offset());
let (shutdown_tx, shutdown_rx) = mpsc::channel(1);
@@ -218,7 +218,7 @@ impl<T: Encode + Send + Sync + 'static> Actor<T> {
#[instrument(name = "durability::local::actor", skip_all)]
async fn run(
self,
mut transactions_rx: mpsc::Receiver<PreparedTx<Txdata<T>>>,
transactions_rx: async_channel::Receiver<PreparedTx<Txdata<T>>>,
mut shutdown_rx: mpsc::Receiver<oneshot::Sender<OwnedNotified>>,
) {
info!("starting durability actor");
@@ -244,7 +244,7 @@ impl<T: Encode + Send + Sync + 'static> Actor<T> {
// potentially requiring the `tx_buf` to allocate additional
// capacity.
// We'll reclaim capacity in excess of `self.batch_size` below.
n = transactions_rx.recv_many(&mut tx_buf, usize::MAX) => {
n = recv_many(&transactions_rx, &mut tx_buf, usize::MAX) => {
if n == 0 {
break;
}
@@ -344,29 +344,8 @@ impl<T: Send + Sync + 'static> Durability for Local<T> {
type TxData = Txdata<T>;
fn append_tx(&self, tx: PreparedTx<Self::TxData>) {
let mut tx = Some(tx);
let blocked = match self.queue.try_reserve() {
Ok(permit) => {
permit.send(tx.take().expect("tx already sent"));
false
}
Err(mpsc::error::TrySendError::Closed(_)) => {
panic!("durability actor crashed");
}
Err(mpsc::error::TrySendError::Full(_)) => {
let mut send = || self.queue.blocking_send(tx.take().expect("tx already sent"));
if tokio::runtime::Handle::try_current().is_ok() {
tokio::task::block_in_place(send)
} else {
send()
}
.expect("durability actor crashed");
true
}
};
self.queue.send_blocking(tx).expect("local durability: actor vanished");
self.queue_depth.fetch_add(1, Relaxed);
let _ = blocked;
}
fn durable_tx_offset(&self) -> DurableOffset {
@@ -436,3 +415,28 @@ impl<T: Encode + 'static> History for Commitlog<Txdata<T>> {
(min, max)
}
}
/// Implement tokio's `recv_many` for an `async_channel` receiver.
async fn recv_many<T>(chan: &async_channel::Receiver<T>, buf: &mut Vec<T>, limit: usize) -> usize {
let mut n = 0;
if !chan.is_empty() {
buf.reserve(chan.len().min(limit));
while n < limit {
let Ok(val) = chan.try_recv() else {
break;
};
buf.push(val);
n += 1;
}
}
if n == 0 {
let Ok(val) = chan.recv().await else {
return n;
};
buf.push(val);
n += 1;
}
n
}
-416
View File
@@ -1,416 +0,0 @@
use std::{
io,
num::NonZeroUsize,
path::PathBuf,
sync::{
atomic::{AtomicU64, Ordering::Relaxed},
Arc,
},
};
use futures::{FutureExt as _, TryFutureExt as _};
use itertools::Itertools as _;
use log::{info, trace, warn};
use scopeguard::ScopeGuard;
use spacetimedb_commitlog::{error, payload::Txdata, Commit, Commitlog, Decoder, Encode, Transaction};
use spacetimedb_fs_utils::lockfile::advisory::{LockError, LockedFile};
use spacetimedb_paths::server::ReplicaDir;
use thiserror::Error;
use tokio::{
sync::{futures::OwnedNotified, mpsc, oneshot, watch, Notify},
task::{spawn_blocking, AbortHandle},
};
use tracing::{instrument, Span};
use crate::{Close, Durability, DurableOffset, History, TxOffset};
pub use spacetimedb_commitlog::repo::{OnNewSegmentFn, SizeOnDisk};
/// [`Local`] configuration.
#[derive(Clone, Copy, Debug)]
pub struct Options {
<<<<<<< conflict 1 of 2
+++++++ ssmwmsoq d6e2ba51 "durability: Flush batches (#4478)"
/// The number of elements to reserve for batching transactions.
///
/// This puts an upper bound on the buffer capacity, while not preventing
/// reallocations when the number of queued transactions exceeds it.
///
/// In other words, the durability actor will attempt to receive all
/// transactions that are currently in the queue, but shrink the buffer to
/// `batch_capacity` if it had to make additional space during a burst.
///
/// Default: 4096
pub batch_capacity: NonZeroUsize,
%%%%%%% diff from: ttnusruw 6fea15f7 "[C#] Update RawTableIterBase.Enumerator to use `ArrayPool` for buffer (#4385)"
\\\\\\\ to: ppnmwost abbcec4a "keynote-2: alpha -> 1.5, withConfirmedReads(true), remove warmup (#4492)"
/// Periodically flush and sync the log this often.
///
- /// Default: 50ms
+ /// Default: 10ms
pub sync_interval: Duration,
>>>>>>> conflict 1 of 2 ends
/// [`Commitlog`] configuration.
pub commitlog: spacetimedb_commitlog::Options,
}
impl Options {
pub const DEFAULT_BATCH_CAPACITY: NonZeroUsize = NonZeroUsize::new(4096).unwrap();
}
impl Default for Options {
fn default() -> Self {
Self {
<<<<<<< conflict 2 of 2
+++++++ ssmwmsoq d6e2ba51 "durability: Flush batches (#4478)"
batch_capacity: Self::DEFAULT_BATCH_CAPACITY,
%%%%%%% diff from: ttnusruw 6fea15f7 "[C#] Update RawTableIterBase.Enumerator to use `ArrayPool` for buffer (#4385)"
\\\\\\\ to: ppnmwost abbcec4a "keynote-2: alpha -> 1.5, withConfirmedReads(true), remove warmup (#4492)"
- sync_interval: Duration::from_millis(50),
+ sync_interval: Duration::from_millis(10),
>>>>>>> conflict 2 of 2 ends
commitlog: Default::default(),
}
}
}
#[derive(Debug, Error)]
pub enum OpenError {
#[error("commitlog directory is locked")]
Lock(#[from] LockError),
#[error("failed to open commitlog")]
Commitlog(#[from] io::Error),
}
type ShutdownReply = oneshot::Sender<OwnedNotified>;
/// [`Durability`] implementation backed by a [`Commitlog`] on local storage.
///
/// The commitlog is constrained to store the canonical [`Txdata`] payload,
/// where the generic parameter `T` is the type of the row data stored in
/// the mutations section.
///
/// `T` is left generic in order to allow bypassing the `ProductValue`
/// intermediate representation in the future.
///
/// Note, however, that instantiating `T` to a different type may require to
/// change the log format version!
pub struct Local<T> {
/// The [`Commitlog`] this [`Durability`] and [`History`] impl wraps.
clog: Arc<Commitlog<Txdata<T>>>,
/// The durable transaction offset, as reported by the background
/// [`FlushAndSyncTask`].
durable_offset: watch::Receiver<Option<TxOffset>>,
/// Backlog of transactions to be written to disk by the background
/// [`PersisterTask`].
///
/// Note that this is unbounded!
queue: mpsc::UnboundedSender<Transaction<Txdata<T>>>,
/// How many transactions are sitting in the `queue`.
///
/// This is mainly for observability purposes, and can thus be updated with
/// relaxed memory ordering.
queue_depth: Arc<AtomicU64>,
/// Channel to request the actor to exit.
shutdown: mpsc::Sender<ShutdownReply>,
/// [AbortHandle] to force cancellation of the [Actor].
abort: AbortHandle,
}
impl<T: Encode + Send + Sync + 'static> Local<T> {
/// Create a [`Local`] instance at the `replica_dir`.
///
/// `replica_dir` must already exist.
///
/// Background tasks are spawned onto the provided tokio runtime.
///
/// We will send a message down the `on_new_segment` channel whenever we begin a new commitlog segment.
/// This is used to capture a snapshot each new segment.
pub fn open(
replica_dir: ReplicaDir,
rt: tokio::runtime::Handle,
opts: Options,
on_new_segment: Option<Arc<OnNewSegmentFn>>,
) -> Result<Self, OpenError> {
info!("open local durability");
// We could just place a lock on the commitlog directory,
// yet for backwards-compatibility, we keep using the `db.lock` file.
let lock = Lock::create(replica_dir.0.join("db.lock"))?;
let clog = Arc::new(Commitlog::open(
replica_dir.commit_log(),
opts.commitlog,
on_new_segment,
)?);
let (queue, txdata_rx) = mpsc::unbounded_channel();
let queue_depth = Arc::new(AtomicU64::new(0));
let (durable_tx, durable_rx) = watch::channel(clog.max_committed_offset());
let (shutdown_tx, shutdown_rx) = mpsc::channel(1);
let abort = rt
.spawn(
Actor {
clog: clog.clone(),
durable_offset: durable_tx,
queue_depth: queue_depth.clone(),
batch_capacity: opts.batch_capacity,
lock,
}
.run(txdata_rx, shutdown_rx),
)
.abort_handle();
Ok(Self {
clog,
durable_offset: durable_rx,
queue,
shutdown: shutdown_tx,
queue_depth,
abort,
})
}
/// Obtain a read-only copy of the durable state that implements [History].
pub fn as_history(&self) -> impl History<TxData = Txdata<T>> {
self.clog.clone()
}
}
impl<T: Send + Sync + 'static> Local<T> {
/// Inspect how many transactions added via [`Self::append_tx`] are pending
/// to be applied to the underlying [`Commitlog`].
pub fn queue_depth(&self) -> u64 {
self.queue_depth.load(Relaxed)
}
/// Obtain an iterator over the [`Commit`]s in the underlying log.
pub fn commits_from(&self, offset: TxOffset) -> impl Iterator<Item = Result<Commit, error::Traversal>> {
self.clog.commits_from(offset).map_ok(Commit::from)
}
/// Get a list of segment offsets, sorted in ascending order.
pub fn existing_segment_offsets(&self) -> io::Result<Vec<TxOffset>> {
self.clog.existing_segment_offsets()
}
/// Compress the segments at the offsets provided, marking them as immutable.
pub fn compress_segments(&self, offsets: &[TxOffset]) -> io::Result<()> {
self.clog.compress_segments(offsets)
}
/// Get the size on disk of the underlying [`Commitlog`].
pub fn size_on_disk(&self) -> io::Result<SizeOnDisk> {
self.clog.size_on_disk()
}
}
struct Actor<T> {
clog: Arc<Commitlog<Txdata<T>>>,
durable_offset: watch::Sender<Option<TxOffset>>,
queue_depth: Arc<AtomicU64>,
batch_capacity: NonZeroUsize,
#[allow(unused)]
lock: Lock,
}
impl<T: Encode + Send + Sync + 'static> Actor<T> {
#[instrument(name = "durability::local::actor", skip_all)]
async fn run(
self,
mut transactions_rx: mpsc::UnboundedReceiver<Transaction<Txdata<T>>>,
mut shutdown_rx: mpsc::Receiver<oneshot::Sender<OwnedNotified>>,
) {
info!("starting durability actor");
let mut tx_buf = Vec::with_capacity(self.batch_capacity.get());
// `flush_and_sync` when the loop exits without panicking,
// or `flush_and_sync` inside the loop failed.
let mut sync_on_exit = true;
loop {
tokio::select! {
// Biased towards the shutdown channel,
// so that we stop accepting new data promptly after
// `Durability::close` was called.
biased;
Some(reply) = shutdown_rx.recv() => {
transactions_rx.close();
let _ = reply.send(self.lock.notified());
},
// Pop as many elements from the channel as possible,
// potentially requiring the `tx_buf` to allocate additional
// capacity.
// We'll reclaim capacity in excess of `self.batch_size` below.
n = transactions_rx.recv_many(&mut tx_buf, usize::MAX) => {
if n == 0 {
break;
}
self.queue_depth.fetch_sub(n as u64, Relaxed);
let clog = self.clog.clone();
tx_buf = spawn_blocking(move || -> io::Result<Vec<Transaction<Txdata<T>>>> {
for tx in tx_buf.drain(..) {
clog.commit([tx])?;
}
Ok(tx_buf)
})
.await
.expect("commitlog write panicked")
.expect("commitlog write failed");
if self.flush_and_sync().await.is_err() {
sync_on_exit = false;
break;
}
// Reclaim burst capacity.
tx_buf.shrink_to(self.batch_capacity.get());
},
}
}
if sync_on_exit {
let _ = self.flush_and_sync().await;
}
info!("exiting durability actor");
}
#[instrument(skip_all)]
async fn flush_and_sync(&self) -> io::Result<Option<TxOffset>> {
// Skip if nothing changed.
if let Some((committed, durable)) = self.clog.max_committed_offset().zip(*self.durable_offset.borrow()) {
if committed == durable {
return Ok(None);
}
}
let clog = self.clog.clone();
let span = Span::current();
spawn_blocking(move || {
let _span = span.enter();
clog.flush_and_sync()
})
.await
.expect("commitlog flush-and-sync blocking task panicked")
.inspect_err(|e| warn!("error flushing commitlog: {e:#}"))
.inspect(|maybe_offset| {
if let Some(new_offset) = maybe_offset {
trace!("synced to offset {new_offset}");
self.durable_offset.send_modify(|val| {
val.replace(*new_offset);
});
}
})
}
}
struct Lock {
file: Option<LockedFile>,
notify_on_drop: Arc<Notify>,
}
impl Lock {
pub fn create(path: PathBuf) -> Result<Self, LockError> {
let file = LockedFile::lock(path).map(Some)?;
let notify_on_drop = Arc::new(Notify::new());
Ok(Self { file, notify_on_drop })
}
pub fn notified(&self) -> OwnedNotified {
self.notify_on_drop.clone().notified_owned()
}
}
impl Drop for Lock {
fn drop(&mut self) {
// Ensure the file lock is dropped before notifying.
if let Some(file) = self.file.take() {
drop(file);
}
self.notify_on_drop.notify_waiters();
}
}
impl<T: Send + Sync + 'static> Durability for Local<T> {
type TxData = Txdata<T>;
fn append_tx(&self, tx: Transaction<Self::TxData>) {
self.queue.send(tx).expect("durability actor crashed");
self.queue_depth.fetch_add(1, Relaxed);
}
fn durable_tx_offset(&self) -> DurableOffset {
self.durable_offset.clone().into()
}
fn close(&self) -> Close {
info!("close local durability");
let durable_offset = self.durable_tx_offset();
let shutdown = self.shutdown.clone();
// Abort actor if shutdown future is dropped.
let abort = scopeguard::guard(self.abort.clone(), |actor| {
warn!("close future dropped, aborting durability actor");
actor.abort();
});
async move {
let (done_tx, done_rx) = oneshot::channel();
// Ignore channel errors - those just mean the actor is already gone.
let _ = shutdown
.send(done_tx)
.map_err(drop)
.and_then(|()| done_rx.map_err(drop))
.and_then(|done| async move {
done.await;
Ok(())
})
.await;
// Don't abort if we completed normally.
let _ = ScopeGuard::into_inner(abort);
durable_offset.last_seen()
}
.boxed()
}
}
impl<T: Encode + 'static> History for Commitlog<Txdata<T>> {
type TxData = Txdata<T>;
fn fold_transactions_from<D>(&self, offset: TxOffset, decoder: D) -> Result<(), D::Error>
where
D: Decoder,
D::Error: From<error::Traversal>,
{
self.fold_transactions_from(offset, decoder)
}
fn transactions_from<'a, D>(
&self,
offset: TxOffset,
decoder: &'a D,
) -> impl Iterator<Item = Result<Transaction<Self::TxData>, D::Error>>
where
D: Decoder<Record = Self::TxData>,
D::Error: From<error::Traversal>,
Self::TxData: 'a,
{
self.transactions_from(offset, decoder)
}
fn tx_range_hint(&self) -> (TxOffset, Option<TxOffset>) {
let min = self.min_committed_offset().unwrap_or_default();
let max = self.max_committed_offset();
(min, max)
}
}
+1 -1
View File
@@ -73,7 +73,7 @@ async fn local_durability_cannot_be_created_if_not_enough_space() -> anyhow::Res
// In reality, `append_tx` will fail at some point in the future.
// I.e. transactions can be lost when the host runs out of disk space.
#[tokio::test(flavor = "multi_thread")]
#[should_panic = "durability actor crashed"]
#[should_panic = "local durability: actor vanished"]
async fn local_durability_crashes_on_new_segment_if_not_enough_space() {
enable_logging();