Replay: some code motion & reuse ReplayCommittedState (#4849)

# Description of Changes

First two commits are code motion.
The second commit fixes a mistake I made in a previous PR that made us
use potentially several `ReplayCommittedState`s per
`datastore.replay(..)`.

More to come in terms of PRs; stay tuned.

# API and ABI breaking changes

None

# Expected complexity level and risk

3
This commit is contained in:
Mazdak Farrokhzad
2026-04-22 13:23:06 +02:00
committed by GitHub
parent 91494c9cf2
commit d639be0af6
4 changed files with 150 additions and 119 deletions
+14 -56
View File
@@ -17,7 +17,7 @@ use spacetimedb_datastore::locking_tx_datastore::datastore::TxMetrics;
use spacetimedb_datastore::locking_tx_datastore::state_view::{
IterByColEqMutTx, IterByColRangeMutTx, IterMutTx, StateView,
};
use spacetimedb_datastore::locking_tx_datastore::{IndexScanPointOrRange, MutTxId, TxId};
use spacetimedb_datastore::locking_tx_datastore::{ApplyHistoryCounters, IndexScanPointOrRange, MutTxId, TxId};
use spacetimedb_datastore::system_tables::{
system_tables, StModuleRow, ST_CLIENT_ID, ST_CONNECTION_CREDENTIALS_ID, ST_VIEW_SUB_ID,
};
@@ -1617,62 +1617,20 @@ impl RelationalDB {
}
}
fn apply_history<H>(datastore: &Locking, database_identity: Identity, history: H) -> Result<(), DBError>
where
H: durability::History<TxData = Txdata>,
{
log::info!("[{database_identity}] DATABASE: applying transaction history...");
// TODO: Revisit once we actually replay history suffixes, ie. starting
// from an offset larger than the history's min offset.
// TODO: We may want to require that a `tokio::runtime::Handle` is
// always supplied when constructing a `RelationalDB`. This would allow
// to spawn a timer task here which just prints the progress periodically
// in case the history is finite but very long.
let (_, max_tx_offset) = history.tx_range_hint();
let mut last_logged_percentage = 0;
let progress = |tx_offset: u64| {
if let Some(max_tx_offset) = max_tx_offset {
let percentage = f64::floor((tx_offset as f64 / max_tx_offset as f64) * 100.0) as i32;
if percentage > last_logged_percentage && percentage % 10 == 0 {
log::info!("[{database_identity}] Loaded {percentage}% ({tx_offset}/{max_tx_offset})");
last_logged_percentage = percentage;
}
// Print _something_ even if we don't know what's still ahead.
} else if tx_offset.is_multiple_of(10_000) {
log::info!("[{database_identity}] Loading transaction {tx_offset}");
}
fn apply_history(
datastore: &Locking,
database_identity: Identity,
history: impl durability::History<TxData = Txdata>,
) -> Result<(), DBError> {
let counters = ApplyHistoryCounters {
replay_commitlog_time_seconds: WORKER_METRICS
.replay_commitlog_time_seconds
.with_label_values(&database_identity),
replay_commitlog_num_commits: WORKER_METRICS
.replay_commitlog_num_commits
.with_label_values(&database_identity),
};
let time_before = std::time::Instant::now();
let mut replay = datastore.replay(
progress,
// We don't want to instantiate an incorrect state;
// if the commitlog contains an inconsistency we'd rather get a hard error than showing customers incorrect data.
spacetimedb_datastore::locking_tx_datastore::ErrorBehavior::FailFast,
);
let start_tx_offset = replay.next_tx_offset();
history
.fold_transactions_from(start_tx_offset, &mut replay)
.map_err(anyhow::Error::from)?;
let time_elapsed = time_before.elapsed();
WORKER_METRICS
.replay_commitlog_time_seconds
.with_label_values(&database_identity)
.set(time_elapsed.as_secs_f64());
let end_tx_offset = replay.next_tx_offset();
WORKER_METRICS
.replay_commitlog_num_commits
.with_label_values(&database_identity)
.set((end_tx_offset - start_tx_offset) as _);
log::info!("[{database_identity}] DATABASE: applied transaction history");
datastore.rebuild_state_after_replay()?;
log::info!("[{database_identity}] DATABASE: rebuilt state after replay");
spacetimedb_datastore::locking_tx_datastore::apply_history(datastore, database_identity, history, counters)?;
Ok(())
}
@@ -24,7 +24,7 @@ use crate::{
},
};
use anyhow::anyhow;
use core::{cell::RefCell, ops::RangeBounds};
use core::ops::RangeBounds;
use parking_lot::{Mutex, RwLock};
use spacetimedb_data_structures::map::{HashCollectionExt, HashMap};
use spacetimedb_durability::TxOffset;
@@ -33,7 +33,7 @@ use spacetimedb_lib::{ConnectionId, Identity};
use spacetimedb_paths::server::SnapshotDirPath;
use spacetimedb_primitives::{ColId, ColList, ConstraintId, IndexId, SequenceId, TableId, ViewId};
use spacetimedb_sats::memory_usage::MemoryUsage;
use spacetimedb_sats::{bsatn, AlgebraicValue, ProductValue};
use spacetimedb_sats::{AlgebraicValue, ProductValue};
use spacetimedb_schema::table_name::TableName;
use spacetimedb_schema::{
reducer_name::ReducerName,
@@ -48,7 +48,6 @@ use spacetimedb_table::{
use std::borrow::Cow;
use std::sync::Arc;
use std::time::{Duration, Instant};
use thiserror::Error;
pub type Result<T> = std::result::Result<T, DatastoreError>;
@@ -171,13 +170,9 @@ impl Locking {
/// The provided closure will be called for each transaction found in the
/// history, the parameter is the transaction's offset. The closure is called
/// _before_ the transaction is applied to the database state.
pub fn replay<F: FnMut(u64)>(&self, progress: F, error_behavior: ErrorBehavior) -> Replay<F> {
Replay {
database_identity: self.database_identity,
committed_state: self.committed_state.clone(),
progress: RefCell::new(progress),
error_behavior,
}
pub fn replay<F: FnMut(u64)>(&self, progress: F, error_behavior: ErrorBehavior) -> Replay<'_, F> {
let committed_state = self.committed_state.write();
Replay::new(self.database_identity, committed_state, progress, error_behavior)
}
/// Construct a new [`Locking`] datastore containing the state stored in `snapshot`.
@@ -999,18 +994,6 @@ impl Locking {
}
}
#[derive(Debug, Error)]
pub enum ReplayError {
#[error("Expected tx offset {expected}, encountered {encountered}")]
InvalidOffset { expected: u64, encountered: u64 },
#[error(transparent)]
Decode(#[from] bsatn::DecodeError),
#[error(transparent)]
Db(#[from] DatastoreError),
#[error(transparent)]
Any(#[from] anyhow::Error),
}
/// Construct a [`Metadata`] from the given [`RowRef`],
/// reading only the columns necessary to construct the value.
fn metadata_from_row(row: RowRef<'_>) -> Result<Metadata> {
@@ -1041,7 +1024,6 @@ pub(crate) mod tests {
};
use crate::traits::{IsolationLevel, MutTx};
use crate::Result;
use bsatn::to_vec;
use core::{fmt, mem};
use itertools::Itertools;
use pretty_assertions::{assert_eq, assert_matches};
@@ -1053,7 +1035,7 @@ pub(crate) mod tests {
use spacetimedb_lib::{resolved_type_via_v9, ScheduleAt, TimeDuration};
use spacetimedb_primitives::{col_list, ArgId, ColId, ScheduleId, ViewId};
use spacetimedb_sats::algebraic_value::ser::value_serialize;
use spacetimedb_sats::bsatn::ToBsatn;
use spacetimedb_sats::bsatn::{to_vec, ToBsatn};
use spacetimedb_sats::layout::RowTypeLayout;
use spacetimedb_sats::raw_identifier::RawIdentifier;
use spacetimedb_sats::{product, AlgebraicType, GroundSpacetimeType, SumTypeVariant, SumValue};
@@ -9,7 +9,7 @@ pub mod state_view;
pub use state_view::{IterByColEqTx, IterByColRangeTx};
pub mod delete_table;
mod replay;
pub use replay::{ErrorBehavior, Replay};
pub use replay::{apply_history, ApplyHistoryCounters, ErrorBehavior, Replay};
mod tx;
pub use tx::{NumDistinctValues, TxId};
mod tx_state;
@@ -1,46 +1,137 @@
use super::committed_state::CommittedState;
use super::datastore::Result;
use super::datastore::{Locking, Result};
use crate::db_metrics::DB_METRICS;
use crate::error::{IndexError, TableError};
use crate::locking_tx_datastore::datastore::ReplayError;
use crate::locking_tx_datastore::state_view::iter_st_column_for_table;
use crate::locking_tx_datastore::state_view::StateView;
use crate::system_tables::{is_built_in_meta_row, StFields as _};
use crate::system_tables::{StColumnRow, StTableFields, StTableRow, ST_COLUMN_ID, ST_TABLE_ID};
use crate::error::{DatastoreError, IndexError, TableError};
use crate::locking_tx_datastore::state_view::{iter_st_column_for_table, StateView};
use crate::system_tables::{
is_built_in_meta_row, StColumnRow, StFields as _, StTableFields, StTableRow, ST_COLUMN_ID, ST_TABLE_ID,
};
use anyhow::{anyhow, Context};
use core::cell::RefMut;
use core::ops::{Deref, DerefMut, RangeBounds};
use parking_lot::{RwLock, RwLockReadGuard};
use parking_lot::RwLockWriteGuard;
use prometheus::core::{AtomicF64, GenericGauge};
use prometheus::IntGauge;
use spacetimedb_commitlog::payload::txdata;
use spacetimedb_data_structures::map::{HashSet, IntMap, IntSet};
use spacetimedb_durability::History;
use spacetimedb_durability::Txdata;
use spacetimedb_lib::Identity;
use spacetimedb_primitives::{ColId, ColList, TableId};
use spacetimedb_sats::algebraic_value::de::ValueDeserializer;
use spacetimedb_sats::buffer::BufReader;
use spacetimedb_sats::{AlgebraicValue, Deserialize, ProductValue};
use spacetimedb_sats::{bsatn, AlgebraicValue, Deserialize, ProductValue};
use spacetimedb_schema::schema::{ColumnSchema, TableSchema};
use spacetimedb_schema::table_name::TableName;
use spacetimedb_table::indexes::RowPointer;
use spacetimedb_table::table::{InsertError, RowRef};
use std::cell::RefCell;
use std::sync::Arc;
use thiserror::Error;
pub fn apply_history(
datastore: &Locking,
database_identity: Identity,
history: impl History<TxData = Txdata<ProductValue>>,
counters: ApplyHistoryCounters,
) -> Result<()> {
log::info!("[{database_identity}] DATABASE: applying transaction history...");
// TODO: Revisit once we actually replay history suffixes, ie. starting
// from an offset larger than the history's min offset.
// TODO: We may want to require that a `tokio::runtime::Handle` is
// always supplied when constructing a `RelationalDB`. This would allow
// to spawn a timer task here which just prints the progress periodically
// in case the history is finite but very long.
let (_, max_tx_offset) = history.tx_range_hint();
let mut last_logged_percentage = 0;
let progress = |tx_offset: u64| {
if let Some(max_tx_offset) = max_tx_offset {
let percentage = f64::floor((tx_offset as f64 / max_tx_offset as f64) * 100.0) as i32;
if percentage > last_logged_percentage && percentage % 10 == 0 {
log::info!("[{database_identity}] Loaded {percentage}% ({tx_offset}/{max_tx_offset})");
last_logged_percentage = percentage;
}
// Print _something_ even if we don't know what's still ahead.
} else if tx_offset.is_multiple_of(10_000) {
log::info!("[{database_identity}] Loading transaction {tx_offset}");
}
};
let time_before = std::time::Instant::now();
let mut replay = datastore.replay(
progress,
// We don't want to instantiate an incorrect state;
// if the commitlog contains an inconsistency we'd rather get a hard error than showing customers incorrect data.
ErrorBehavior::FailFast,
);
let start_tx_offset = replay.next_tx_offset();
history
.fold_transactions_from(start_tx_offset, &mut replay)
.map_err(anyhow::Error::from)?;
let time_elapsed = time_before.elapsed();
counters.replay_commitlog_time_seconds.set(time_elapsed.as_secs_f64());
let end_tx_offset = replay.next_tx_offset();
counters
.replay_commitlog_num_commits
.set((end_tx_offset - start_tx_offset) as _);
log::info!("[{database_identity}] DATABASE: applied transaction history");
drop(replay); // Neccessary to avoid a deadlock.
datastore.rebuild_state_after_replay()?;
log::info!("[{database_identity}] DATABASE: rebuilt state after replay");
Ok(())
}
pub struct ApplyHistoryCounters {
pub replay_commitlog_time_seconds: GenericGauge<AtomicF64>,
pub replay_commitlog_num_commits: IntGauge,
}
#[derive(Debug, Error)]
pub enum ReplayError {
#[error("Expected tx offset {expected}, encountered {encountered}")]
InvalidOffset { expected: u64, encountered: u64 },
#[error(transparent)]
Decode(#[from] bsatn::DecodeError),
#[error(transparent)]
Db(#[from] DatastoreError),
#[error(transparent)]
Any(#[from] anyhow::Error),
}
/// A [`spacetimedb_commitlog::Decoder`] suitable for replaying a transaction
/// history into the database state.
pub struct Replay<F> {
pub(super) database_identity: Identity,
pub(super) committed_state: Arc<RwLock<CommittedState>>,
pub(super) progress: RefCell<F>,
pub(super) error_behavior: ErrorBehavior,
pub struct Replay<'a, F> {
database_identity: Identity,
committed_state: RefCell<ReplayCommittedState<'a>>,
progress: RefCell<F>,
error_behavior: ErrorBehavior,
}
impl<F> Replay<F> {
fn using_visitor<T>(&self, f: impl FnOnce(&mut ReplayVisitor<'_, F>) -> T) -> T {
let mut committed_state = self.committed_state.write();
let state = &mut *committed_state;
let committed_state = ReplayCommittedState::new(state);
impl<'a, F> Replay<'a, F> {
pub fn new(
database_identity: Identity,
committed_state: RwLockWriteGuard<'a, CommittedState>,
progress: F,
error_behavior: ErrorBehavior,
) -> Self {
Self {
database_identity,
committed_state: RefCell::new(ReplayCommittedState::new(committed_state)),
progress: RefCell::new(progress),
error_behavior,
}
}
fn using_visitor<T>(&self, f: impl FnOnce(&mut ReplayVisitor<'_, '_, F>) -> T) -> T {
let mut visitor = ReplayVisitor {
database_identity: &self.database_identity,
committed_state,
committed_state: &mut self.committed_state.borrow_mut(),
progress: &mut *self.progress.borrow_mut(),
dropped_table_names: IntMap::default(),
error_behavior: self.error_behavior,
@@ -49,16 +140,16 @@ impl<F> Replay<F> {
}
pub fn next_tx_offset(&self) -> u64 {
self.committed_state.read_arc().next_tx_offset
self.committed_state.borrow().next_tx_offset
}
// NOTE: This is not unused.
pub fn committed_state(&self) -> RwLockReadGuard<'_, CommittedState> {
self.committed_state.read()
pub fn committed_state(&self) -> RefMut<'_, ReplayCommittedState<'a>> {
self.committed_state.borrow_mut()
}
}
impl<F: FnMut(u64)> spacetimedb_commitlog::Decoder for &mut Replay<F> {
impl<F: FnMut(u64)> spacetimedb_commitlog::Decoder for &mut Replay<'_, F> {
type Record = txdata::Txdata<ProductValue>;
type Error = txdata::DecoderError<ReplayError>;
@@ -139,9 +230,9 @@ pub enum ErrorBehavior {
Warn,
}
struct ReplayVisitor<'a, F> {
struct ReplayVisitor<'a, 'cs, F> {
database_identity: &'a Identity,
committed_state: ReplayCommittedState<'a>,
committed_state: &'a mut ReplayCommittedState<'cs>,
progress: &'a mut F,
// Since deletes are handled before truncation / drop, sometimes the schema
// info is gone. We save the name on the first delete of that table so metrics
@@ -150,7 +241,7 @@ struct ReplayVisitor<'a, F> {
error_behavior: ErrorBehavior,
}
impl<F> ReplayVisitor<'_, F> {
impl<F> ReplayVisitor<'_, '_, F> {
/// Process `err` according to `self.error_behavior`,
/// either warning about it or returning it.
///
@@ -166,7 +257,7 @@ impl<F> ReplayVisitor<'_, F> {
}
}
impl<F: FnMut(u64)> spacetimedb_commitlog::payload::txdata::Visitor for ReplayVisitor<'_, F> {
impl<F: FnMut(u64)> spacetimedb_commitlog::payload::txdata::Visitor for ReplayVisitor<'_, '_, F> {
type Error = ReplayError;
// NOTE: Technically, this could be `()` if and when we can extract the
// row data without going through `ProductValue` (PV).
@@ -314,9 +405,9 @@ impl<F: FnMut(u64)> spacetimedb_commitlog::payload::txdata::Visitor for ReplayVi
}
/// A `CommittedState` under construction during replay.
struct ReplayCommittedState<'cs> {
/// The committed state being contructed.
state: &'cs mut CommittedState,
pub struct ReplayCommittedState<'cs> {
/// The committed state being constructed.
state: RwLockWriteGuard<'cs, CommittedState>,
/// Whether the table was dropped within the current transaction during replay.
///
@@ -361,25 +452,25 @@ struct ReplayCommittedState<'cs> {
///
/// [`RowPointer`]s from this set are passed to the `unsafe` [`Table::get_row_ref_unchecked`],
/// so it's important to properly maintain only [`RowPointer`]s to valid, extant, non-deleted rows.
pub(super) replay_table_updated: IntMap<TableId, RowPointer>,
replay_table_updated: IntMap<TableId, RowPointer>,
}
impl Deref for ReplayCommittedState<'_> {
type Target = CommittedState;
fn deref(&self) -> &Self::Target {
self.state
&self.state
}
}
impl DerefMut for ReplayCommittedState<'_> {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut *self.state
&mut self.state
}
}
impl<'cs> ReplayCommittedState<'cs> {
fn new(state: &'cs mut CommittedState) -> Self {
fn new(state: RwLockWriteGuard<'cs, CommittedState>) -> Self {
Self {
state,
replay_table_dropped: <_>::default(),
@@ -512,7 +603,7 @@ impl<'cs> ReplayCommittedState<'cs> {
let target_col_id = ColId::deserialize(ValueDeserializer::from_ref(&st_column_row.elements[1]))
.expect("second field in `st_column` should decode to a `ColId`");
let outdated_st_column_rows = iter_st_column_for_table(self.state, &target_table_id.into())?
let outdated_st_column_rows = iter_st_column_for_table(self, &target_table_id.into())?
.filter_map(|row_ref| {
StColumnRow::try_from(row_ref)
.map(|c| (c.col_pos == target_col_id && row_ref.pointer() != row_ptr).then(|| row_ref.pointer()))
@@ -542,7 +633,7 @@ impl<'cs> ReplayCommittedState<'cs> {
// and not the other one, as it is being replaced.
// `Self::ignore_previous_version_of_column` has marked the old version as ignored,
// so filter only the non-ignored columns.
let mut columns = iter_st_column_for_table(self.state, &table_id.into())?
let mut columns = iter_st_column_for_table(self, &table_id.into())?
.filter(|row_ref| !self.replay_columns_to_ignore.contains(&row_ref.pointer()))
.map(|row_ref| {
let row = StColumnRow::try_from(row_ref)?;
@@ -766,7 +857,7 @@ mod tests {
// Directly call replay_insert on committed state.
let row = u32_str_u32(1, "Carol", 40);
{
let state = &mut *datastore.committed_state.write();
let state = datastore.committed_state.write();
let mut committed_state = ReplayCommittedState::new(state);
committed_state.replay_insert(table_id, &schema, &row)?;
}