Replay: some code motion & reuse ReplayCommittedState (#4849)

# Description of Changes

First two commits are code motion.
The second commit fixes a mistake I made in a previous PR that made us
use potentially several `ReplayCommittedState`s per
`datastore.replay(..)`.

More to come in terms of PRs; stay tuned.

# API and ABI breaking changes

None

# Expected complexity level and risk

3
This commit is contained in:
Mazdak Farrokhzad
2026-04-22 13:23:06 +02:00
committed by GitHub
parent 91494c9cf2
commit d639be0af6
4 changed files with 150 additions and 119 deletions
+14 -56
View File
@@ -17,7 +17,7 @@ use spacetimedb_datastore::locking_tx_datastore::datastore::TxMetrics;
use spacetimedb_datastore::locking_tx_datastore::state_view::{ use spacetimedb_datastore::locking_tx_datastore::state_view::{
IterByColEqMutTx, IterByColRangeMutTx, IterMutTx, StateView, IterByColEqMutTx, IterByColRangeMutTx, IterMutTx, StateView,
}; };
use spacetimedb_datastore::locking_tx_datastore::{IndexScanPointOrRange, MutTxId, TxId}; use spacetimedb_datastore::locking_tx_datastore::{ApplyHistoryCounters, IndexScanPointOrRange, MutTxId, TxId};
use spacetimedb_datastore::system_tables::{ use spacetimedb_datastore::system_tables::{
system_tables, StModuleRow, ST_CLIENT_ID, ST_CONNECTION_CREDENTIALS_ID, ST_VIEW_SUB_ID, system_tables, StModuleRow, ST_CLIENT_ID, ST_CONNECTION_CREDENTIALS_ID, ST_VIEW_SUB_ID,
}; };
@@ -1617,62 +1617,20 @@ impl RelationalDB {
} }
} }
fn apply_history<H>(datastore: &Locking, database_identity: Identity, history: H) -> Result<(), DBError> fn apply_history(
where datastore: &Locking,
H: durability::History<TxData = Txdata>, database_identity: Identity,
{ history: impl durability::History<TxData = Txdata>,
log::info!("[{database_identity}] DATABASE: applying transaction history..."); ) -> Result<(), DBError> {
let counters = ApplyHistoryCounters {
// TODO: Revisit once we actually replay history suffixes, ie. starting replay_commitlog_time_seconds: WORKER_METRICS
// from an offset larger than the history's min offset. .replay_commitlog_time_seconds
// TODO: We may want to require that a `tokio::runtime::Handle` is .with_label_values(&database_identity),
// always supplied when constructing a `RelationalDB`. This would allow replay_commitlog_num_commits: WORKER_METRICS
// to spawn a timer task here which just prints the progress periodically .replay_commitlog_num_commits
// in case the history is finite but very long. .with_label_values(&database_identity),
let (_, max_tx_offset) = history.tx_range_hint();
let mut last_logged_percentage = 0;
let progress = |tx_offset: u64| {
if let Some(max_tx_offset) = max_tx_offset {
let percentage = f64::floor((tx_offset as f64 / max_tx_offset as f64) * 100.0) as i32;
if percentage > last_logged_percentage && percentage % 10 == 0 {
log::info!("[{database_identity}] Loaded {percentage}% ({tx_offset}/{max_tx_offset})");
last_logged_percentage = percentage;
}
// Print _something_ even if we don't know what's still ahead.
} else if tx_offset.is_multiple_of(10_000) {
log::info!("[{database_identity}] Loading transaction {tx_offset}");
}
}; };
spacetimedb_datastore::locking_tx_datastore::apply_history(datastore, database_identity, history, counters)?;
let time_before = std::time::Instant::now();
let mut replay = datastore.replay(
progress,
// We don't want to instantiate an incorrect state;
// if the commitlog contains an inconsistency we'd rather get a hard error than showing customers incorrect data.
spacetimedb_datastore::locking_tx_datastore::ErrorBehavior::FailFast,
);
let start_tx_offset = replay.next_tx_offset();
history
.fold_transactions_from(start_tx_offset, &mut replay)
.map_err(anyhow::Error::from)?;
let time_elapsed = time_before.elapsed();
WORKER_METRICS
.replay_commitlog_time_seconds
.with_label_values(&database_identity)
.set(time_elapsed.as_secs_f64());
let end_tx_offset = replay.next_tx_offset();
WORKER_METRICS
.replay_commitlog_num_commits
.with_label_values(&database_identity)
.set((end_tx_offset - start_tx_offset) as _);
log::info!("[{database_identity}] DATABASE: applied transaction history");
datastore.rebuild_state_after_replay()?;
log::info!("[{database_identity}] DATABASE: rebuilt state after replay");
Ok(()) Ok(())
} }
@@ -24,7 +24,7 @@ use crate::{
}, },
}; };
use anyhow::anyhow; use anyhow::anyhow;
use core::{cell::RefCell, ops::RangeBounds}; use core::ops::RangeBounds;
use parking_lot::{Mutex, RwLock}; use parking_lot::{Mutex, RwLock};
use spacetimedb_data_structures::map::{HashCollectionExt, HashMap}; use spacetimedb_data_structures::map::{HashCollectionExt, HashMap};
use spacetimedb_durability::TxOffset; use spacetimedb_durability::TxOffset;
@@ -33,7 +33,7 @@ use spacetimedb_lib::{ConnectionId, Identity};
use spacetimedb_paths::server::SnapshotDirPath; use spacetimedb_paths::server::SnapshotDirPath;
use spacetimedb_primitives::{ColId, ColList, ConstraintId, IndexId, SequenceId, TableId, ViewId}; use spacetimedb_primitives::{ColId, ColList, ConstraintId, IndexId, SequenceId, TableId, ViewId};
use spacetimedb_sats::memory_usage::MemoryUsage; use spacetimedb_sats::memory_usage::MemoryUsage;
use spacetimedb_sats::{bsatn, AlgebraicValue, ProductValue}; use spacetimedb_sats::{AlgebraicValue, ProductValue};
use spacetimedb_schema::table_name::TableName; use spacetimedb_schema::table_name::TableName;
use spacetimedb_schema::{ use spacetimedb_schema::{
reducer_name::ReducerName, reducer_name::ReducerName,
@@ -48,7 +48,6 @@ use spacetimedb_table::{
use std::borrow::Cow; use std::borrow::Cow;
use std::sync::Arc; use std::sync::Arc;
use std::time::{Duration, Instant}; use std::time::{Duration, Instant};
use thiserror::Error;
pub type Result<T> = std::result::Result<T, DatastoreError>; pub type Result<T> = std::result::Result<T, DatastoreError>;
@@ -171,13 +170,9 @@ impl Locking {
/// The provided closure will be called for each transaction found in the /// The provided closure will be called for each transaction found in the
/// history, the parameter is the transaction's offset. The closure is called /// history, the parameter is the transaction's offset. The closure is called
/// _before_ the transaction is applied to the database state. /// _before_ the transaction is applied to the database state.
pub fn replay<F: FnMut(u64)>(&self, progress: F, error_behavior: ErrorBehavior) -> Replay<F> { pub fn replay<F: FnMut(u64)>(&self, progress: F, error_behavior: ErrorBehavior) -> Replay<'_, F> {
Replay { let committed_state = self.committed_state.write();
database_identity: self.database_identity, Replay::new(self.database_identity, committed_state, progress, error_behavior)
committed_state: self.committed_state.clone(),
progress: RefCell::new(progress),
error_behavior,
}
} }
/// Construct a new [`Locking`] datastore containing the state stored in `snapshot`. /// Construct a new [`Locking`] datastore containing the state stored in `snapshot`.
@@ -999,18 +994,6 @@ impl Locking {
} }
} }
#[derive(Debug, Error)]
pub enum ReplayError {
#[error("Expected tx offset {expected}, encountered {encountered}")]
InvalidOffset { expected: u64, encountered: u64 },
#[error(transparent)]
Decode(#[from] bsatn::DecodeError),
#[error(transparent)]
Db(#[from] DatastoreError),
#[error(transparent)]
Any(#[from] anyhow::Error),
}
/// Construct a [`Metadata`] from the given [`RowRef`], /// Construct a [`Metadata`] from the given [`RowRef`],
/// reading only the columns necessary to construct the value. /// reading only the columns necessary to construct the value.
fn metadata_from_row(row: RowRef<'_>) -> Result<Metadata> { fn metadata_from_row(row: RowRef<'_>) -> Result<Metadata> {
@@ -1041,7 +1024,6 @@ pub(crate) mod tests {
}; };
use crate::traits::{IsolationLevel, MutTx}; use crate::traits::{IsolationLevel, MutTx};
use crate::Result; use crate::Result;
use bsatn::to_vec;
use core::{fmt, mem}; use core::{fmt, mem};
use itertools::Itertools; use itertools::Itertools;
use pretty_assertions::{assert_eq, assert_matches}; use pretty_assertions::{assert_eq, assert_matches};
@@ -1053,7 +1035,7 @@ pub(crate) mod tests {
use spacetimedb_lib::{resolved_type_via_v9, ScheduleAt, TimeDuration}; use spacetimedb_lib::{resolved_type_via_v9, ScheduleAt, TimeDuration};
use spacetimedb_primitives::{col_list, ArgId, ColId, ScheduleId, ViewId}; use spacetimedb_primitives::{col_list, ArgId, ColId, ScheduleId, ViewId};
use spacetimedb_sats::algebraic_value::ser::value_serialize; use spacetimedb_sats::algebraic_value::ser::value_serialize;
use spacetimedb_sats::bsatn::ToBsatn; use spacetimedb_sats::bsatn::{to_vec, ToBsatn};
use spacetimedb_sats::layout::RowTypeLayout; use spacetimedb_sats::layout::RowTypeLayout;
use spacetimedb_sats::raw_identifier::RawIdentifier; use spacetimedb_sats::raw_identifier::RawIdentifier;
use spacetimedb_sats::{product, AlgebraicType, GroundSpacetimeType, SumTypeVariant, SumValue}; use spacetimedb_sats::{product, AlgebraicType, GroundSpacetimeType, SumTypeVariant, SumValue};
@@ -9,7 +9,7 @@ pub mod state_view;
pub use state_view::{IterByColEqTx, IterByColRangeTx}; pub use state_view::{IterByColEqTx, IterByColRangeTx};
pub mod delete_table; pub mod delete_table;
mod replay; mod replay;
pub use replay::{ErrorBehavior, Replay}; pub use replay::{apply_history, ApplyHistoryCounters, ErrorBehavior, Replay};
mod tx; mod tx;
pub use tx::{NumDistinctValues, TxId}; pub use tx::{NumDistinctValues, TxId};
mod tx_state; mod tx_state;
@@ -1,46 +1,137 @@
use super::committed_state::CommittedState; use super::committed_state::CommittedState;
use super::datastore::Result; use super::datastore::{Locking, Result};
use crate::db_metrics::DB_METRICS; use crate::db_metrics::DB_METRICS;
use crate::error::{IndexError, TableError}; use crate::error::{DatastoreError, IndexError, TableError};
use crate::locking_tx_datastore::datastore::ReplayError; use crate::locking_tx_datastore::state_view::{iter_st_column_for_table, StateView};
use crate::locking_tx_datastore::state_view::iter_st_column_for_table; use crate::system_tables::{
use crate::locking_tx_datastore::state_view::StateView; is_built_in_meta_row, StColumnRow, StFields as _, StTableFields, StTableRow, ST_COLUMN_ID, ST_TABLE_ID,
use crate::system_tables::{is_built_in_meta_row, StFields as _}; };
use crate::system_tables::{StColumnRow, StTableFields, StTableRow, ST_COLUMN_ID, ST_TABLE_ID};
use anyhow::{anyhow, Context}; use anyhow::{anyhow, Context};
use core::cell::RefMut;
use core::ops::{Deref, DerefMut, RangeBounds}; use core::ops::{Deref, DerefMut, RangeBounds};
use parking_lot::{RwLock, RwLockReadGuard}; use parking_lot::RwLockWriteGuard;
use prometheus::core::{AtomicF64, GenericGauge};
use prometheus::IntGauge;
use spacetimedb_commitlog::payload::txdata; use spacetimedb_commitlog::payload::txdata;
use spacetimedb_data_structures::map::{HashSet, IntMap, IntSet}; use spacetimedb_data_structures::map::{HashSet, IntMap, IntSet};
use spacetimedb_durability::History;
use spacetimedb_durability::Txdata;
use spacetimedb_lib::Identity; use spacetimedb_lib::Identity;
use spacetimedb_primitives::{ColId, ColList, TableId}; use spacetimedb_primitives::{ColId, ColList, TableId};
use spacetimedb_sats::algebraic_value::de::ValueDeserializer; use spacetimedb_sats::algebraic_value::de::ValueDeserializer;
use spacetimedb_sats::buffer::BufReader; use spacetimedb_sats::buffer::BufReader;
use spacetimedb_sats::{AlgebraicValue, Deserialize, ProductValue}; use spacetimedb_sats::{bsatn, AlgebraicValue, Deserialize, ProductValue};
use spacetimedb_schema::schema::{ColumnSchema, TableSchema}; use spacetimedb_schema::schema::{ColumnSchema, TableSchema};
use spacetimedb_schema::table_name::TableName; use spacetimedb_schema::table_name::TableName;
use spacetimedb_table::indexes::RowPointer; use spacetimedb_table::indexes::RowPointer;
use spacetimedb_table::table::{InsertError, RowRef}; use spacetimedb_table::table::{InsertError, RowRef};
use std::cell::RefCell; use std::cell::RefCell;
use std::sync::Arc; use std::sync::Arc;
use thiserror::Error;
pub fn apply_history(
datastore: &Locking,
database_identity: Identity,
history: impl History<TxData = Txdata<ProductValue>>,
counters: ApplyHistoryCounters,
) -> Result<()> {
log::info!("[{database_identity}] DATABASE: applying transaction history...");
// TODO: Revisit once we actually replay history suffixes, ie. starting
// from an offset larger than the history's min offset.
// TODO: We may want to require that a `tokio::runtime::Handle` is
// always supplied when constructing a `RelationalDB`. This would allow
// to spawn a timer task here which just prints the progress periodically
// in case the history is finite but very long.
let (_, max_tx_offset) = history.tx_range_hint();
let mut last_logged_percentage = 0;
let progress = |tx_offset: u64| {
if let Some(max_tx_offset) = max_tx_offset {
let percentage = f64::floor((tx_offset as f64 / max_tx_offset as f64) * 100.0) as i32;
if percentage > last_logged_percentage && percentage % 10 == 0 {
log::info!("[{database_identity}] Loaded {percentage}% ({tx_offset}/{max_tx_offset})");
last_logged_percentage = percentage;
}
// Print _something_ even if we don't know what's still ahead.
} else if tx_offset.is_multiple_of(10_000) {
log::info!("[{database_identity}] Loading transaction {tx_offset}");
}
};
let time_before = std::time::Instant::now();
let mut replay = datastore.replay(
progress,
// We don't want to instantiate an incorrect state;
// if the commitlog contains an inconsistency we'd rather get a hard error than showing customers incorrect data.
ErrorBehavior::FailFast,
);
let start_tx_offset = replay.next_tx_offset();
history
.fold_transactions_from(start_tx_offset, &mut replay)
.map_err(anyhow::Error::from)?;
let time_elapsed = time_before.elapsed();
counters.replay_commitlog_time_seconds.set(time_elapsed.as_secs_f64());
let end_tx_offset = replay.next_tx_offset();
counters
.replay_commitlog_num_commits
.set((end_tx_offset - start_tx_offset) as _);
log::info!("[{database_identity}] DATABASE: applied transaction history");
drop(replay); // Neccessary to avoid a deadlock.
datastore.rebuild_state_after_replay()?;
log::info!("[{database_identity}] DATABASE: rebuilt state after replay");
Ok(())
}
pub struct ApplyHistoryCounters {
pub replay_commitlog_time_seconds: GenericGauge<AtomicF64>,
pub replay_commitlog_num_commits: IntGauge,
}
#[derive(Debug, Error)]
pub enum ReplayError {
#[error("Expected tx offset {expected}, encountered {encountered}")]
InvalidOffset { expected: u64, encountered: u64 },
#[error(transparent)]
Decode(#[from] bsatn::DecodeError),
#[error(transparent)]
Db(#[from] DatastoreError),
#[error(transparent)]
Any(#[from] anyhow::Error),
}
/// A [`spacetimedb_commitlog::Decoder`] suitable for replaying a transaction /// A [`spacetimedb_commitlog::Decoder`] suitable for replaying a transaction
/// history into the database state. /// history into the database state.
pub struct Replay<F> { pub struct Replay<'a, F> {
pub(super) database_identity: Identity, database_identity: Identity,
pub(super) committed_state: Arc<RwLock<CommittedState>>, committed_state: RefCell<ReplayCommittedState<'a>>,
pub(super) progress: RefCell<F>, progress: RefCell<F>,
pub(super) error_behavior: ErrorBehavior, error_behavior: ErrorBehavior,
} }
impl<F> Replay<F> { impl<'a, F> Replay<'a, F> {
fn using_visitor<T>(&self, f: impl FnOnce(&mut ReplayVisitor<'_, F>) -> T) -> T { pub fn new(
let mut committed_state = self.committed_state.write(); database_identity: Identity,
let state = &mut *committed_state; committed_state: RwLockWriteGuard<'a, CommittedState>,
let committed_state = ReplayCommittedState::new(state); progress: F,
error_behavior: ErrorBehavior,
) -> Self {
Self {
database_identity,
committed_state: RefCell::new(ReplayCommittedState::new(committed_state)),
progress: RefCell::new(progress),
error_behavior,
}
}
fn using_visitor<T>(&self, f: impl FnOnce(&mut ReplayVisitor<'_, '_, F>) -> T) -> T {
let mut visitor = ReplayVisitor { let mut visitor = ReplayVisitor {
database_identity: &self.database_identity, database_identity: &self.database_identity,
committed_state, committed_state: &mut self.committed_state.borrow_mut(),
progress: &mut *self.progress.borrow_mut(), progress: &mut *self.progress.borrow_mut(),
dropped_table_names: IntMap::default(), dropped_table_names: IntMap::default(),
error_behavior: self.error_behavior, error_behavior: self.error_behavior,
@@ -49,16 +140,16 @@ impl<F> Replay<F> {
} }
pub fn next_tx_offset(&self) -> u64 { pub fn next_tx_offset(&self) -> u64 {
self.committed_state.read_arc().next_tx_offset self.committed_state.borrow().next_tx_offset
} }
// NOTE: This is not unused. // NOTE: This is not unused.
pub fn committed_state(&self) -> RwLockReadGuard<'_, CommittedState> { pub fn committed_state(&self) -> RefMut<'_, ReplayCommittedState<'a>> {
self.committed_state.read() self.committed_state.borrow_mut()
} }
} }
impl<F: FnMut(u64)> spacetimedb_commitlog::Decoder for &mut Replay<F> { impl<F: FnMut(u64)> spacetimedb_commitlog::Decoder for &mut Replay<'_, F> {
type Record = txdata::Txdata<ProductValue>; type Record = txdata::Txdata<ProductValue>;
type Error = txdata::DecoderError<ReplayError>; type Error = txdata::DecoderError<ReplayError>;
@@ -139,9 +230,9 @@ pub enum ErrorBehavior {
Warn, Warn,
} }
struct ReplayVisitor<'a, F> { struct ReplayVisitor<'a, 'cs, F> {
database_identity: &'a Identity, database_identity: &'a Identity,
committed_state: ReplayCommittedState<'a>, committed_state: &'a mut ReplayCommittedState<'cs>,
progress: &'a mut F, progress: &'a mut F,
// Since deletes are handled before truncation / drop, sometimes the schema // Since deletes are handled before truncation / drop, sometimes the schema
// info is gone. We save the name on the first delete of that table so metrics // info is gone. We save the name on the first delete of that table so metrics
@@ -150,7 +241,7 @@ struct ReplayVisitor<'a, F> {
error_behavior: ErrorBehavior, error_behavior: ErrorBehavior,
} }
impl<F> ReplayVisitor<'_, F> { impl<F> ReplayVisitor<'_, '_, F> {
/// Process `err` according to `self.error_behavior`, /// Process `err` according to `self.error_behavior`,
/// either warning about it or returning it. /// either warning about it or returning it.
/// ///
@@ -166,7 +257,7 @@ impl<F> ReplayVisitor<'_, F> {
} }
} }
impl<F: FnMut(u64)> spacetimedb_commitlog::payload::txdata::Visitor for ReplayVisitor<'_, F> { impl<F: FnMut(u64)> spacetimedb_commitlog::payload::txdata::Visitor for ReplayVisitor<'_, '_, F> {
type Error = ReplayError; type Error = ReplayError;
// NOTE: Technically, this could be `()` if and when we can extract the // NOTE: Technically, this could be `()` if and when we can extract the
// row data without going through `ProductValue` (PV). // row data without going through `ProductValue` (PV).
@@ -314,9 +405,9 @@ impl<F: FnMut(u64)> spacetimedb_commitlog::payload::txdata::Visitor for ReplayVi
} }
/// A `CommittedState` under construction during replay. /// A `CommittedState` under construction during replay.
struct ReplayCommittedState<'cs> { pub struct ReplayCommittedState<'cs> {
/// The committed state being contructed. /// The committed state being constructed.
state: &'cs mut CommittedState, state: RwLockWriteGuard<'cs, CommittedState>,
/// Whether the table was dropped within the current transaction during replay. /// Whether the table was dropped within the current transaction during replay.
/// ///
@@ -361,25 +452,25 @@ struct ReplayCommittedState<'cs> {
/// ///
/// [`RowPointer`]s from this set are passed to the `unsafe` [`Table::get_row_ref_unchecked`], /// [`RowPointer`]s from this set are passed to the `unsafe` [`Table::get_row_ref_unchecked`],
/// so it's important to properly maintain only [`RowPointer`]s to valid, extant, non-deleted rows. /// so it's important to properly maintain only [`RowPointer`]s to valid, extant, non-deleted rows.
pub(super) replay_table_updated: IntMap<TableId, RowPointer>, replay_table_updated: IntMap<TableId, RowPointer>,
} }
impl Deref for ReplayCommittedState<'_> { impl Deref for ReplayCommittedState<'_> {
type Target = CommittedState; type Target = CommittedState;
fn deref(&self) -> &Self::Target { fn deref(&self) -> &Self::Target {
self.state &self.state
} }
} }
impl DerefMut for ReplayCommittedState<'_> { impl DerefMut for ReplayCommittedState<'_> {
fn deref_mut(&mut self) -> &mut Self::Target { fn deref_mut(&mut self) -> &mut Self::Target {
&mut *self.state &mut self.state
} }
} }
impl<'cs> ReplayCommittedState<'cs> { impl<'cs> ReplayCommittedState<'cs> {
fn new(state: &'cs mut CommittedState) -> Self { fn new(state: RwLockWriteGuard<'cs, CommittedState>) -> Self {
Self { Self {
state, state,
replay_table_dropped: <_>::default(), replay_table_dropped: <_>::default(),
@@ -512,7 +603,7 @@ impl<'cs> ReplayCommittedState<'cs> {
let target_col_id = ColId::deserialize(ValueDeserializer::from_ref(&st_column_row.elements[1])) let target_col_id = ColId::deserialize(ValueDeserializer::from_ref(&st_column_row.elements[1]))
.expect("second field in `st_column` should decode to a `ColId`"); .expect("second field in `st_column` should decode to a `ColId`");
let outdated_st_column_rows = iter_st_column_for_table(self.state, &target_table_id.into())? let outdated_st_column_rows = iter_st_column_for_table(self, &target_table_id.into())?
.filter_map(|row_ref| { .filter_map(|row_ref| {
StColumnRow::try_from(row_ref) StColumnRow::try_from(row_ref)
.map(|c| (c.col_pos == target_col_id && row_ref.pointer() != row_ptr).then(|| row_ref.pointer())) .map(|c| (c.col_pos == target_col_id && row_ref.pointer() != row_ptr).then(|| row_ref.pointer()))
@@ -542,7 +633,7 @@ impl<'cs> ReplayCommittedState<'cs> {
// and not the other one, as it is being replaced. // and not the other one, as it is being replaced.
// `Self::ignore_previous_version_of_column` has marked the old version as ignored, // `Self::ignore_previous_version_of_column` has marked the old version as ignored,
// so filter only the non-ignored columns. // so filter only the non-ignored columns.
let mut columns = iter_st_column_for_table(self.state, &table_id.into())? let mut columns = iter_st_column_for_table(self, &table_id.into())?
.filter(|row_ref| !self.replay_columns_to_ignore.contains(&row_ref.pointer())) .filter(|row_ref| !self.replay_columns_to_ignore.contains(&row_ref.pointer()))
.map(|row_ref| { .map(|row_ref| {
let row = StColumnRow::try_from(row_ref)?; let row = StColumnRow::try_from(row_ref)?;
@@ -766,7 +857,7 @@ mod tests {
// Directly call replay_insert on committed state. // Directly call replay_insert on committed state.
let row = u32_str_u32(1, "Carol", 40); let row = u32_str_u32(1, "Carol", 40);
{ {
let state = &mut *datastore.committed_state.write(); let state = datastore.committed_state.write();
let mut committed_state = ReplayCommittedState::new(state); let mut committed_state = ReplayCommittedState::new(state);
committed_state.replay_insert(table_id, &schema, &row)?; committed_state.replay_insert(table_id, &schema, &row)?;
} }