mirror of
https://github.com/clockworklabs/SpacetimeDB.git
synced 2026-05-06 23:59:43 -04:00
durability: Use async-channel to allow blocking send (#4802)
The previous approaches would either: - panic when the queue becomes full, as `append_tx` is run inside the context of a `LocalSet`, which is basically a glorified current thread runtime - deadlock because the receiver runtime has no way of notifiying the sender of freed capacity in the channel `async-channel` handles wait queues and notifications internally, so can be used freely from either blocking or async contexts. This _may_ come at different performance characteristics, but I haven't measured them. --------- Co-authored-by: joshua-spacetime <josh@clockworklabs.io>
This commit is contained in:
Generated
+49
@@ -231,6 +231,18 @@ dependencies = [
|
||||
"wait-timeout",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "async-channel"
|
||||
version = "2.5.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "924ed96dd52d1b75e9c1a3e6275715fd320f5f9439fb5a4a11fa51f4221158d2"
|
||||
dependencies = [
|
||||
"concurrent-queue",
|
||||
"event-listener-strategy",
|
||||
"futures-core",
|
||||
"pin-project-lite",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "async-scoped"
|
||||
version = "0.9.0"
|
||||
@@ -1086,6 +1098,15 @@ dependencies = [
|
||||
"static_assertions",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "concurrent-queue"
|
||||
version = "2.5.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "4ca0197aee26d1ae37445ee532fefce43251d24cc7c166799f4d46817f1d3973"
|
||||
dependencies = [
|
||||
"crossbeam-utils",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "concurrent_lru"
|
||||
version = "0.2.0"
|
||||
@@ -2106,6 +2127,27 @@ dependencies = [
|
||||
"serde",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "event-listener"
|
||||
version = "5.4.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "e13b66accf52311f30a0db42147dadea9850cb48cd070028831ae5f5d4b856ab"
|
||||
dependencies = [
|
||||
"concurrent-queue",
|
||||
"parking",
|
||||
"pin-project-lite",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "event-listener-strategy"
|
||||
version = "0.5.4"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "8be9f3dfaaffdae2972880079a491a1a8bb7cbed0b8dd7a347f668b4150a3b93"
|
||||
dependencies = [
|
||||
"event-listener",
|
||||
"pin-project-lite",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "event-table-client"
|
||||
version = "2.1.0"
|
||||
@@ -5153,6 +5195,12 @@ dependencies = [
|
||||
"unicode-width 0.1.14",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "parking"
|
||||
version = "2.2.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "f38d5652c16fde515bb1ecef450ab0f6a219d619a7274976324d5e377f7dceba"
|
||||
|
||||
[[package]]
|
||||
name = "parking_lot"
|
||||
version = "0.11.2"
|
||||
@@ -8125,6 +8173,7 @@ name = "spacetimedb-durability"
|
||||
version = "2.1.0"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"async-channel",
|
||||
"futures",
|
||||
"itertools 0.12.1",
|
||||
"log",
|
||||
|
||||
@@ -153,6 +153,7 @@ ahash = { version = "0.8", default-features = false, features = ["std"] }
|
||||
anyhow = "1.0.68"
|
||||
anymap = "0.12"
|
||||
arrayvec = "0.7.2"
|
||||
async-channel = "2.5"
|
||||
async-stream = "0.3.6"
|
||||
async-trait = "0.1.68"
|
||||
axum = { version = "0.7", features = ["tracing"] }
|
||||
|
||||
@@ -13,6 +13,7 @@ fallocate = ["spacetimedb-commitlog/fallocate"]
|
||||
|
||||
[dependencies]
|
||||
anyhow.workspace = true
|
||||
async-channel.workspace = true
|
||||
futures.workspace = true
|
||||
itertools.workspace = true
|
||||
log.workspace = true
|
||||
|
||||
@@ -97,7 +97,7 @@ pub struct Local<T> {
|
||||
///
|
||||
/// The queue is bounded to
|
||||
/// `Options::QUEUE_CAPACITY_MULTIPLIER * Options::batch_capacity`.
|
||||
queue: mpsc::Sender<PreparedTx<Txdata<T>>>,
|
||||
queue: async_channel::Sender<PreparedTx<Txdata<T>>>,
|
||||
/// How many transactions are pending durability, including items buffered
|
||||
/// in the queue and items currently being written by the actor.
|
||||
///
|
||||
@@ -137,7 +137,7 @@ impl<T: Encode + Send + Sync + 'static> Local<T> {
|
||||
on_new_segment,
|
||||
)?);
|
||||
let queue_capacity = opts.queue_capacity();
|
||||
let (queue, txdata_rx) = mpsc::channel(queue_capacity);
|
||||
let (queue, txdata_rx) = async_channel::bounded(queue_capacity);
|
||||
let queue_depth = Arc::new(AtomicU64::new(0));
|
||||
let (durable_tx, durable_rx) = watch::channel(clog.max_committed_offset());
|
||||
let (shutdown_tx, shutdown_rx) = mpsc::channel(1);
|
||||
@@ -218,7 +218,7 @@ impl<T: Encode + Send + Sync + 'static> Actor<T> {
|
||||
#[instrument(name = "durability::local::actor", skip_all)]
|
||||
async fn run(
|
||||
self,
|
||||
mut transactions_rx: mpsc::Receiver<PreparedTx<Txdata<T>>>,
|
||||
transactions_rx: async_channel::Receiver<PreparedTx<Txdata<T>>>,
|
||||
mut shutdown_rx: mpsc::Receiver<oneshot::Sender<OwnedNotified>>,
|
||||
) {
|
||||
info!("starting durability actor");
|
||||
@@ -244,7 +244,7 @@ impl<T: Encode + Send + Sync + 'static> Actor<T> {
|
||||
// potentially requiring the `tx_buf` to allocate additional
|
||||
// capacity.
|
||||
// We'll reclaim capacity in excess of `self.batch_size` below.
|
||||
n = transactions_rx.recv_many(&mut tx_buf, usize::MAX) => {
|
||||
n = recv_many(&transactions_rx, &mut tx_buf, usize::MAX) => {
|
||||
if n == 0 {
|
||||
break;
|
||||
}
|
||||
@@ -344,29 +344,8 @@ impl<T: Send + Sync + 'static> Durability for Local<T> {
|
||||
type TxData = Txdata<T>;
|
||||
|
||||
fn append_tx(&self, tx: PreparedTx<Self::TxData>) {
|
||||
let mut tx = Some(tx);
|
||||
let blocked = match self.queue.try_reserve() {
|
||||
Ok(permit) => {
|
||||
permit.send(tx.take().expect("tx already sent"));
|
||||
false
|
||||
}
|
||||
Err(mpsc::error::TrySendError::Closed(_)) => {
|
||||
panic!("durability actor crashed");
|
||||
}
|
||||
Err(mpsc::error::TrySendError::Full(_)) => {
|
||||
let mut send = || self.queue.blocking_send(tx.take().expect("tx already sent"));
|
||||
if tokio::runtime::Handle::try_current().is_ok() {
|
||||
tokio::task::block_in_place(send)
|
||||
} else {
|
||||
send()
|
||||
}
|
||||
.expect("durability actor crashed");
|
||||
true
|
||||
}
|
||||
};
|
||||
|
||||
self.queue.send_blocking(tx).expect("local durability: actor vanished");
|
||||
self.queue_depth.fetch_add(1, Relaxed);
|
||||
let _ = blocked;
|
||||
}
|
||||
|
||||
fn durable_tx_offset(&self) -> DurableOffset {
|
||||
@@ -436,3 +415,28 @@ impl<T: Encode + 'static> History for Commitlog<Txdata<T>> {
|
||||
(min, max)
|
||||
}
|
||||
}
|
||||
|
||||
/// Implement tokio's `recv_many` for an `async_channel` receiver.
|
||||
async fn recv_many<T>(chan: &async_channel::Receiver<T>, buf: &mut Vec<T>, limit: usize) -> usize {
|
||||
let mut n = 0;
|
||||
if !chan.is_empty() {
|
||||
buf.reserve(chan.len().min(limit));
|
||||
while n < limit {
|
||||
let Ok(val) = chan.try_recv() else {
|
||||
break;
|
||||
};
|
||||
buf.push(val);
|
||||
n += 1;
|
||||
}
|
||||
}
|
||||
|
||||
if n == 0 {
|
||||
let Ok(val) = chan.recv().await else {
|
||||
return n;
|
||||
};
|
||||
buf.push(val);
|
||||
n += 1;
|
||||
}
|
||||
|
||||
n
|
||||
}
|
||||
|
||||
@@ -1,416 +0,0 @@
|
||||
use std::{
|
||||
io,
|
||||
num::NonZeroUsize,
|
||||
path::PathBuf,
|
||||
sync::{
|
||||
atomic::{AtomicU64, Ordering::Relaxed},
|
||||
Arc,
|
||||
},
|
||||
};
|
||||
|
||||
use futures::{FutureExt as _, TryFutureExt as _};
|
||||
use itertools::Itertools as _;
|
||||
use log::{info, trace, warn};
|
||||
use scopeguard::ScopeGuard;
|
||||
use spacetimedb_commitlog::{error, payload::Txdata, Commit, Commitlog, Decoder, Encode, Transaction};
|
||||
use spacetimedb_fs_utils::lockfile::advisory::{LockError, LockedFile};
|
||||
use spacetimedb_paths::server::ReplicaDir;
|
||||
use thiserror::Error;
|
||||
use tokio::{
|
||||
sync::{futures::OwnedNotified, mpsc, oneshot, watch, Notify},
|
||||
task::{spawn_blocking, AbortHandle},
|
||||
};
|
||||
use tracing::{instrument, Span};
|
||||
|
||||
use crate::{Close, Durability, DurableOffset, History, TxOffset};
|
||||
|
||||
pub use spacetimedb_commitlog::repo::{OnNewSegmentFn, SizeOnDisk};
|
||||
|
||||
/// [`Local`] configuration.
|
||||
#[derive(Clone, Copy, Debug)]
|
||||
pub struct Options {
|
||||
<<<<<<< conflict 1 of 2
|
||||
+++++++ ssmwmsoq d6e2ba51 "durability: Flush batches (#4478)"
|
||||
/// The number of elements to reserve for batching transactions.
|
||||
///
|
||||
/// This puts an upper bound on the buffer capacity, while not preventing
|
||||
/// reallocations when the number of queued transactions exceeds it.
|
||||
///
|
||||
/// In other words, the durability actor will attempt to receive all
|
||||
/// transactions that are currently in the queue, but shrink the buffer to
|
||||
/// `batch_capacity` if it had to make additional space during a burst.
|
||||
///
|
||||
/// Default: 4096
|
||||
pub batch_capacity: NonZeroUsize,
|
||||
%%%%%%% diff from: ttnusruw 6fea15f7 "[C#] Update RawTableIterBase.Enumerator to use `ArrayPool` for buffer (#4385)"
|
||||
\\\\\\\ to: ppnmwost abbcec4a "keynote-2: alpha -> 1.5, withConfirmedReads(true), remove warmup (#4492)"
|
||||
/// Periodically flush and sync the log this often.
|
||||
///
|
||||
- /// Default: 50ms
|
||||
+ /// Default: 10ms
|
||||
pub sync_interval: Duration,
|
||||
>>>>>>> conflict 1 of 2 ends
|
||||
/// [`Commitlog`] configuration.
|
||||
pub commitlog: spacetimedb_commitlog::Options,
|
||||
}
|
||||
|
||||
impl Options {
|
||||
pub const DEFAULT_BATCH_CAPACITY: NonZeroUsize = NonZeroUsize::new(4096).unwrap();
|
||||
}
|
||||
|
||||
impl Default for Options {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
<<<<<<< conflict 2 of 2
|
||||
+++++++ ssmwmsoq d6e2ba51 "durability: Flush batches (#4478)"
|
||||
batch_capacity: Self::DEFAULT_BATCH_CAPACITY,
|
||||
%%%%%%% diff from: ttnusruw 6fea15f7 "[C#] Update RawTableIterBase.Enumerator to use `ArrayPool` for buffer (#4385)"
|
||||
\\\\\\\ to: ppnmwost abbcec4a "keynote-2: alpha -> 1.5, withConfirmedReads(true), remove warmup (#4492)"
|
||||
- sync_interval: Duration::from_millis(50),
|
||||
+ sync_interval: Duration::from_millis(10),
|
||||
>>>>>>> conflict 2 of 2 ends
|
||||
commitlog: Default::default(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Error)]
|
||||
pub enum OpenError {
|
||||
#[error("commitlog directory is locked")]
|
||||
Lock(#[from] LockError),
|
||||
#[error("failed to open commitlog")]
|
||||
Commitlog(#[from] io::Error),
|
||||
}
|
||||
|
||||
type ShutdownReply = oneshot::Sender<OwnedNotified>;
|
||||
|
||||
/// [`Durability`] implementation backed by a [`Commitlog`] on local storage.
|
||||
///
|
||||
/// The commitlog is constrained to store the canonical [`Txdata`] payload,
|
||||
/// where the generic parameter `T` is the type of the row data stored in
|
||||
/// the mutations section.
|
||||
///
|
||||
/// `T` is left generic in order to allow bypassing the `ProductValue`
|
||||
/// intermediate representation in the future.
|
||||
///
|
||||
/// Note, however, that instantiating `T` to a different type may require to
|
||||
/// change the log format version!
|
||||
pub struct Local<T> {
|
||||
/// The [`Commitlog`] this [`Durability`] and [`History`] impl wraps.
|
||||
clog: Arc<Commitlog<Txdata<T>>>,
|
||||
/// The durable transaction offset, as reported by the background
|
||||
/// [`FlushAndSyncTask`].
|
||||
durable_offset: watch::Receiver<Option<TxOffset>>,
|
||||
/// Backlog of transactions to be written to disk by the background
|
||||
/// [`PersisterTask`].
|
||||
///
|
||||
/// Note that this is unbounded!
|
||||
queue: mpsc::UnboundedSender<Transaction<Txdata<T>>>,
|
||||
/// How many transactions are sitting in the `queue`.
|
||||
///
|
||||
/// This is mainly for observability purposes, and can thus be updated with
|
||||
/// relaxed memory ordering.
|
||||
queue_depth: Arc<AtomicU64>,
|
||||
/// Channel to request the actor to exit.
|
||||
shutdown: mpsc::Sender<ShutdownReply>,
|
||||
/// [AbortHandle] to force cancellation of the [Actor].
|
||||
abort: AbortHandle,
|
||||
}
|
||||
|
||||
impl<T: Encode + Send + Sync + 'static> Local<T> {
|
||||
/// Create a [`Local`] instance at the `replica_dir`.
|
||||
///
|
||||
/// `replica_dir` must already exist.
|
||||
///
|
||||
/// Background tasks are spawned onto the provided tokio runtime.
|
||||
///
|
||||
/// We will send a message down the `on_new_segment` channel whenever we begin a new commitlog segment.
|
||||
/// This is used to capture a snapshot each new segment.
|
||||
pub fn open(
|
||||
replica_dir: ReplicaDir,
|
||||
rt: tokio::runtime::Handle,
|
||||
opts: Options,
|
||||
on_new_segment: Option<Arc<OnNewSegmentFn>>,
|
||||
) -> Result<Self, OpenError> {
|
||||
info!("open local durability");
|
||||
|
||||
// We could just place a lock on the commitlog directory,
|
||||
// yet for backwards-compatibility, we keep using the `db.lock` file.
|
||||
let lock = Lock::create(replica_dir.0.join("db.lock"))?;
|
||||
|
||||
let clog = Arc::new(Commitlog::open(
|
||||
replica_dir.commit_log(),
|
||||
opts.commitlog,
|
||||
on_new_segment,
|
||||
)?);
|
||||
let (queue, txdata_rx) = mpsc::unbounded_channel();
|
||||
let queue_depth = Arc::new(AtomicU64::new(0));
|
||||
let (durable_tx, durable_rx) = watch::channel(clog.max_committed_offset());
|
||||
let (shutdown_tx, shutdown_rx) = mpsc::channel(1);
|
||||
|
||||
let abort = rt
|
||||
.spawn(
|
||||
Actor {
|
||||
clog: clog.clone(),
|
||||
|
||||
durable_offset: durable_tx,
|
||||
queue_depth: queue_depth.clone(),
|
||||
|
||||
batch_capacity: opts.batch_capacity,
|
||||
|
||||
lock,
|
||||
}
|
||||
.run(txdata_rx, shutdown_rx),
|
||||
)
|
||||
.abort_handle();
|
||||
|
||||
Ok(Self {
|
||||
clog,
|
||||
durable_offset: durable_rx,
|
||||
queue,
|
||||
shutdown: shutdown_tx,
|
||||
queue_depth,
|
||||
abort,
|
||||
})
|
||||
}
|
||||
|
||||
/// Obtain a read-only copy of the durable state that implements [History].
|
||||
pub fn as_history(&self) -> impl History<TxData = Txdata<T>> {
|
||||
self.clog.clone()
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: Send + Sync + 'static> Local<T> {
|
||||
/// Inspect how many transactions added via [`Self::append_tx`] are pending
|
||||
/// to be applied to the underlying [`Commitlog`].
|
||||
pub fn queue_depth(&self) -> u64 {
|
||||
self.queue_depth.load(Relaxed)
|
||||
}
|
||||
|
||||
/// Obtain an iterator over the [`Commit`]s in the underlying log.
|
||||
pub fn commits_from(&self, offset: TxOffset) -> impl Iterator<Item = Result<Commit, error::Traversal>> {
|
||||
self.clog.commits_from(offset).map_ok(Commit::from)
|
||||
}
|
||||
|
||||
/// Get a list of segment offsets, sorted in ascending order.
|
||||
pub fn existing_segment_offsets(&self) -> io::Result<Vec<TxOffset>> {
|
||||
self.clog.existing_segment_offsets()
|
||||
}
|
||||
|
||||
/// Compress the segments at the offsets provided, marking them as immutable.
|
||||
pub fn compress_segments(&self, offsets: &[TxOffset]) -> io::Result<()> {
|
||||
self.clog.compress_segments(offsets)
|
||||
}
|
||||
|
||||
/// Get the size on disk of the underlying [`Commitlog`].
|
||||
pub fn size_on_disk(&self) -> io::Result<SizeOnDisk> {
|
||||
self.clog.size_on_disk()
|
||||
}
|
||||
}
|
||||
|
||||
struct Actor<T> {
|
||||
clog: Arc<Commitlog<Txdata<T>>>,
|
||||
|
||||
durable_offset: watch::Sender<Option<TxOffset>>,
|
||||
queue_depth: Arc<AtomicU64>,
|
||||
|
||||
batch_capacity: NonZeroUsize,
|
||||
|
||||
#[allow(unused)]
|
||||
lock: Lock,
|
||||
}
|
||||
|
||||
impl<T: Encode + Send + Sync + 'static> Actor<T> {
|
||||
#[instrument(name = "durability::local::actor", skip_all)]
|
||||
async fn run(
|
||||
self,
|
||||
mut transactions_rx: mpsc::UnboundedReceiver<Transaction<Txdata<T>>>,
|
||||
mut shutdown_rx: mpsc::Receiver<oneshot::Sender<OwnedNotified>>,
|
||||
) {
|
||||
info!("starting durability actor");
|
||||
|
||||
let mut tx_buf = Vec::with_capacity(self.batch_capacity.get());
|
||||
// `flush_and_sync` when the loop exits without panicking,
|
||||
// or `flush_and_sync` inside the loop failed.
|
||||
let mut sync_on_exit = true;
|
||||
|
||||
loop {
|
||||
tokio::select! {
|
||||
// Biased towards the shutdown channel,
|
||||
// so that we stop accepting new data promptly after
|
||||
// `Durability::close` was called.
|
||||
biased;
|
||||
|
||||
Some(reply) = shutdown_rx.recv() => {
|
||||
transactions_rx.close();
|
||||
let _ = reply.send(self.lock.notified());
|
||||
},
|
||||
|
||||
// Pop as many elements from the channel as possible,
|
||||
// potentially requiring the `tx_buf` to allocate additional
|
||||
// capacity.
|
||||
// We'll reclaim capacity in excess of `self.batch_size` below.
|
||||
n = transactions_rx.recv_many(&mut tx_buf, usize::MAX) => {
|
||||
if n == 0 {
|
||||
break;
|
||||
}
|
||||
self.queue_depth.fetch_sub(n as u64, Relaxed);
|
||||
let clog = self.clog.clone();
|
||||
tx_buf = spawn_blocking(move || -> io::Result<Vec<Transaction<Txdata<T>>>> {
|
||||
for tx in tx_buf.drain(..) {
|
||||
clog.commit([tx])?;
|
||||
}
|
||||
Ok(tx_buf)
|
||||
})
|
||||
.await
|
||||
.expect("commitlog write panicked")
|
||||
.expect("commitlog write failed");
|
||||
if self.flush_and_sync().await.is_err() {
|
||||
sync_on_exit = false;
|
||||
break;
|
||||
}
|
||||
// Reclaim burst capacity.
|
||||
tx_buf.shrink_to(self.batch_capacity.get());
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
if sync_on_exit {
|
||||
let _ = self.flush_and_sync().await;
|
||||
}
|
||||
|
||||
info!("exiting durability actor");
|
||||
}
|
||||
|
||||
#[instrument(skip_all)]
|
||||
async fn flush_and_sync(&self) -> io::Result<Option<TxOffset>> {
|
||||
// Skip if nothing changed.
|
||||
if let Some((committed, durable)) = self.clog.max_committed_offset().zip(*self.durable_offset.borrow()) {
|
||||
if committed == durable {
|
||||
return Ok(None);
|
||||
}
|
||||
}
|
||||
|
||||
let clog = self.clog.clone();
|
||||
let span = Span::current();
|
||||
spawn_blocking(move || {
|
||||
let _span = span.enter();
|
||||
clog.flush_and_sync()
|
||||
})
|
||||
.await
|
||||
.expect("commitlog flush-and-sync blocking task panicked")
|
||||
.inspect_err(|e| warn!("error flushing commitlog: {e:#}"))
|
||||
.inspect(|maybe_offset| {
|
||||
if let Some(new_offset) = maybe_offset {
|
||||
trace!("synced to offset {new_offset}");
|
||||
self.durable_offset.send_modify(|val| {
|
||||
val.replace(*new_offset);
|
||||
});
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
struct Lock {
|
||||
file: Option<LockedFile>,
|
||||
notify_on_drop: Arc<Notify>,
|
||||
}
|
||||
|
||||
impl Lock {
|
||||
pub fn create(path: PathBuf) -> Result<Self, LockError> {
|
||||
let file = LockedFile::lock(path).map(Some)?;
|
||||
let notify_on_drop = Arc::new(Notify::new());
|
||||
|
||||
Ok(Self { file, notify_on_drop })
|
||||
}
|
||||
|
||||
pub fn notified(&self) -> OwnedNotified {
|
||||
self.notify_on_drop.clone().notified_owned()
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for Lock {
|
||||
fn drop(&mut self) {
|
||||
// Ensure the file lock is dropped before notifying.
|
||||
if let Some(file) = self.file.take() {
|
||||
drop(file);
|
||||
}
|
||||
self.notify_on_drop.notify_waiters();
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: Send + Sync + 'static> Durability for Local<T> {
|
||||
type TxData = Txdata<T>;
|
||||
|
||||
fn append_tx(&self, tx: Transaction<Self::TxData>) {
|
||||
self.queue.send(tx).expect("durability actor crashed");
|
||||
self.queue_depth.fetch_add(1, Relaxed);
|
||||
}
|
||||
|
||||
fn durable_tx_offset(&self) -> DurableOffset {
|
||||
self.durable_offset.clone().into()
|
||||
}
|
||||
|
||||
fn close(&self) -> Close {
|
||||
info!("close local durability");
|
||||
|
||||
let durable_offset = self.durable_tx_offset();
|
||||
let shutdown = self.shutdown.clone();
|
||||
// Abort actor if shutdown future is dropped.
|
||||
let abort = scopeguard::guard(self.abort.clone(), |actor| {
|
||||
warn!("close future dropped, aborting durability actor");
|
||||
actor.abort();
|
||||
});
|
||||
|
||||
async move {
|
||||
let (done_tx, done_rx) = oneshot::channel();
|
||||
// Ignore channel errors - those just mean the actor is already gone.
|
||||
let _ = shutdown
|
||||
.send(done_tx)
|
||||
.map_err(drop)
|
||||
.and_then(|()| done_rx.map_err(drop))
|
||||
.and_then(|done| async move {
|
||||
done.await;
|
||||
Ok(())
|
||||
})
|
||||
.await;
|
||||
// Don't abort if we completed normally.
|
||||
let _ = ScopeGuard::into_inner(abort);
|
||||
|
||||
durable_offset.last_seen()
|
||||
}
|
||||
.boxed()
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: Encode + 'static> History for Commitlog<Txdata<T>> {
|
||||
type TxData = Txdata<T>;
|
||||
|
||||
fn fold_transactions_from<D>(&self, offset: TxOffset, decoder: D) -> Result<(), D::Error>
|
||||
where
|
||||
D: Decoder,
|
||||
D::Error: From<error::Traversal>,
|
||||
{
|
||||
self.fold_transactions_from(offset, decoder)
|
||||
}
|
||||
|
||||
fn transactions_from<'a, D>(
|
||||
&self,
|
||||
offset: TxOffset,
|
||||
decoder: &'a D,
|
||||
) -> impl Iterator<Item = Result<Transaction<Self::TxData>, D::Error>>
|
||||
where
|
||||
D: Decoder<Record = Self::TxData>,
|
||||
D::Error: From<error::Traversal>,
|
||||
Self::TxData: 'a,
|
||||
{
|
||||
self.transactions_from(offset, decoder)
|
||||
}
|
||||
|
||||
fn tx_range_hint(&self) -> (TxOffset, Option<TxOffset>) {
|
||||
let min = self.min_committed_offset().unwrap_or_default();
|
||||
let max = self.max_committed_offset();
|
||||
|
||||
(min, max)
|
||||
}
|
||||
}
|
||||
@@ -73,7 +73,7 @@ async fn local_durability_cannot_be_created_if_not_enough_space() -> anyhow::Res
|
||||
// In reality, `append_tx` will fail at some point in the future.
|
||||
// I.e. transactions can be lost when the host runs out of disk space.
|
||||
#[tokio::test(flavor = "multi_thread")]
|
||||
#[should_panic = "durability actor crashed"]
|
||||
#[should_panic = "local durability: actor vanished"]
|
||||
async fn local_durability_crashes_on_new_segment_if_not_enough_space() {
|
||||
enable_logging();
|
||||
|
||||
|
||||
Reference in New Issue
Block a user