mirror of
https://github.com/clockworklabs/SpacetimeDB.git
synced 2026-05-06 07:26:43 -04:00
Replay: some code motion & reuse ReplayCommittedState (#4849)
# Description of Changes First two commits are code motion. The second commit fixes a mistake I made in a previous PR that made us use potentially several `ReplayCommittedState`s per `datastore.replay(..)`. More to come in terms of PRs; stay tuned. # API and ABI breaking changes None # Expected complexity level and risk 3
This commit is contained in:
committed by
GitHub
parent
91494c9cf2
commit
d639be0af6
@@ -17,7 +17,7 @@ use spacetimedb_datastore::locking_tx_datastore::datastore::TxMetrics;
|
||||
use spacetimedb_datastore::locking_tx_datastore::state_view::{
|
||||
IterByColEqMutTx, IterByColRangeMutTx, IterMutTx, StateView,
|
||||
};
|
||||
use spacetimedb_datastore::locking_tx_datastore::{IndexScanPointOrRange, MutTxId, TxId};
|
||||
use spacetimedb_datastore::locking_tx_datastore::{ApplyHistoryCounters, IndexScanPointOrRange, MutTxId, TxId};
|
||||
use spacetimedb_datastore::system_tables::{
|
||||
system_tables, StModuleRow, ST_CLIENT_ID, ST_CONNECTION_CREDENTIALS_ID, ST_VIEW_SUB_ID,
|
||||
};
|
||||
@@ -1617,62 +1617,20 @@ impl RelationalDB {
|
||||
}
|
||||
}
|
||||
|
||||
fn apply_history<H>(datastore: &Locking, database_identity: Identity, history: H) -> Result<(), DBError>
|
||||
where
|
||||
H: durability::History<TxData = Txdata>,
|
||||
{
|
||||
log::info!("[{database_identity}] DATABASE: applying transaction history...");
|
||||
|
||||
// TODO: Revisit once we actually replay history suffixes, ie. starting
|
||||
// from an offset larger than the history's min offset.
|
||||
// TODO: We may want to require that a `tokio::runtime::Handle` is
|
||||
// always supplied when constructing a `RelationalDB`. This would allow
|
||||
// to spawn a timer task here which just prints the progress periodically
|
||||
// in case the history is finite but very long.
|
||||
let (_, max_tx_offset) = history.tx_range_hint();
|
||||
let mut last_logged_percentage = 0;
|
||||
let progress = |tx_offset: u64| {
|
||||
if let Some(max_tx_offset) = max_tx_offset {
|
||||
let percentage = f64::floor((tx_offset as f64 / max_tx_offset as f64) * 100.0) as i32;
|
||||
if percentage > last_logged_percentage && percentage % 10 == 0 {
|
||||
log::info!("[{database_identity}] Loaded {percentage}% ({tx_offset}/{max_tx_offset})");
|
||||
last_logged_percentage = percentage;
|
||||
}
|
||||
// Print _something_ even if we don't know what's still ahead.
|
||||
} else if tx_offset.is_multiple_of(10_000) {
|
||||
log::info!("[{database_identity}] Loading transaction {tx_offset}");
|
||||
}
|
||||
fn apply_history(
|
||||
datastore: &Locking,
|
||||
database_identity: Identity,
|
||||
history: impl durability::History<TxData = Txdata>,
|
||||
) -> Result<(), DBError> {
|
||||
let counters = ApplyHistoryCounters {
|
||||
replay_commitlog_time_seconds: WORKER_METRICS
|
||||
.replay_commitlog_time_seconds
|
||||
.with_label_values(&database_identity),
|
||||
replay_commitlog_num_commits: WORKER_METRICS
|
||||
.replay_commitlog_num_commits
|
||||
.with_label_values(&database_identity),
|
||||
};
|
||||
|
||||
let time_before = std::time::Instant::now();
|
||||
|
||||
let mut replay = datastore.replay(
|
||||
progress,
|
||||
// We don't want to instantiate an incorrect state;
|
||||
// if the commitlog contains an inconsistency we'd rather get a hard error than showing customers incorrect data.
|
||||
spacetimedb_datastore::locking_tx_datastore::ErrorBehavior::FailFast,
|
||||
);
|
||||
let start_tx_offset = replay.next_tx_offset();
|
||||
history
|
||||
.fold_transactions_from(start_tx_offset, &mut replay)
|
||||
.map_err(anyhow::Error::from)?;
|
||||
|
||||
let time_elapsed = time_before.elapsed();
|
||||
WORKER_METRICS
|
||||
.replay_commitlog_time_seconds
|
||||
.with_label_values(&database_identity)
|
||||
.set(time_elapsed.as_secs_f64());
|
||||
|
||||
let end_tx_offset = replay.next_tx_offset();
|
||||
WORKER_METRICS
|
||||
.replay_commitlog_num_commits
|
||||
.with_label_values(&database_identity)
|
||||
.set((end_tx_offset - start_tx_offset) as _);
|
||||
|
||||
log::info!("[{database_identity}] DATABASE: applied transaction history");
|
||||
datastore.rebuild_state_after_replay()?;
|
||||
log::info!("[{database_identity}] DATABASE: rebuilt state after replay");
|
||||
|
||||
spacetimedb_datastore::locking_tx_datastore::apply_history(datastore, database_identity, history, counters)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
||||
@@ -24,7 +24,7 @@ use crate::{
|
||||
},
|
||||
};
|
||||
use anyhow::anyhow;
|
||||
use core::{cell::RefCell, ops::RangeBounds};
|
||||
use core::ops::RangeBounds;
|
||||
use parking_lot::{Mutex, RwLock};
|
||||
use spacetimedb_data_structures::map::{HashCollectionExt, HashMap};
|
||||
use spacetimedb_durability::TxOffset;
|
||||
@@ -33,7 +33,7 @@ use spacetimedb_lib::{ConnectionId, Identity};
|
||||
use spacetimedb_paths::server::SnapshotDirPath;
|
||||
use spacetimedb_primitives::{ColId, ColList, ConstraintId, IndexId, SequenceId, TableId, ViewId};
|
||||
use spacetimedb_sats::memory_usage::MemoryUsage;
|
||||
use spacetimedb_sats::{bsatn, AlgebraicValue, ProductValue};
|
||||
use spacetimedb_sats::{AlgebraicValue, ProductValue};
|
||||
use spacetimedb_schema::table_name::TableName;
|
||||
use spacetimedb_schema::{
|
||||
reducer_name::ReducerName,
|
||||
@@ -48,7 +48,6 @@ use spacetimedb_table::{
|
||||
use std::borrow::Cow;
|
||||
use std::sync::Arc;
|
||||
use std::time::{Duration, Instant};
|
||||
use thiserror::Error;
|
||||
|
||||
pub type Result<T> = std::result::Result<T, DatastoreError>;
|
||||
|
||||
@@ -171,13 +170,9 @@ impl Locking {
|
||||
/// The provided closure will be called for each transaction found in the
|
||||
/// history, the parameter is the transaction's offset. The closure is called
|
||||
/// _before_ the transaction is applied to the database state.
|
||||
pub fn replay<F: FnMut(u64)>(&self, progress: F, error_behavior: ErrorBehavior) -> Replay<F> {
|
||||
Replay {
|
||||
database_identity: self.database_identity,
|
||||
committed_state: self.committed_state.clone(),
|
||||
progress: RefCell::new(progress),
|
||||
error_behavior,
|
||||
}
|
||||
pub fn replay<F: FnMut(u64)>(&self, progress: F, error_behavior: ErrorBehavior) -> Replay<'_, F> {
|
||||
let committed_state = self.committed_state.write();
|
||||
Replay::new(self.database_identity, committed_state, progress, error_behavior)
|
||||
}
|
||||
|
||||
/// Construct a new [`Locking`] datastore containing the state stored in `snapshot`.
|
||||
@@ -999,18 +994,6 @@ impl Locking {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Error)]
|
||||
pub enum ReplayError {
|
||||
#[error("Expected tx offset {expected}, encountered {encountered}")]
|
||||
InvalidOffset { expected: u64, encountered: u64 },
|
||||
#[error(transparent)]
|
||||
Decode(#[from] bsatn::DecodeError),
|
||||
#[error(transparent)]
|
||||
Db(#[from] DatastoreError),
|
||||
#[error(transparent)]
|
||||
Any(#[from] anyhow::Error),
|
||||
}
|
||||
|
||||
/// Construct a [`Metadata`] from the given [`RowRef`],
|
||||
/// reading only the columns necessary to construct the value.
|
||||
fn metadata_from_row(row: RowRef<'_>) -> Result<Metadata> {
|
||||
@@ -1041,7 +1024,6 @@ pub(crate) mod tests {
|
||||
};
|
||||
use crate::traits::{IsolationLevel, MutTx};
|
||||
use crate::Result;
|
||||
use bsatn::to_vec;
|
||||
use core::{fmt, mem};
|
||||
use itertools::Itertools;
|
||||
use pretty_assertions::{assert_eq, assert_matches};
|
||||
@@ -1053,7 +1035,7 @@ pub(crate) mod tests {
|
||||
use spacetimedb_lib::{resolved_type_via_v9, ScheduleAt, TimeDuration};
|
||||
use spacetimedb_primitives::{col_list, ArgId, ColId, ScheduleId, ViewId};
|
||||
use spacetimedb_sats::algebraic_value::ser::value_serialize;
|
||||
use spacetimedb_sats::bsatn::ToBsatn;
|
||||
use spacetimedb_sats::bsatn::{to_vec, ToBsatn};
|
||||
use spacetimedb_sats::layout::RowTypeLayout;
|
||||
use spacetimedb_sats::raw_identifier::RawIdentifier;
|
||||
use spacetimedb_sats::{product, AlgebraicType, GroundSpacetimeType, SumTypeVariant, SumValue};
|
||||
|
||||
@@ -9,7 +9,7 @@ pub mod state_view;
|
||||
pub use state_view::{IterByColEqTx, IterByColRangeTx};
|
||||
pub mod delete_table;
|
||||
mod replay;
|
||||
pub use replay::{ErrorBehavior, Replay};
|
||||
pub use replay::{apply_history, ApplyHistoryCounters, ErrorBehavior, Replay};
|
||||
mod tx;
|
||||
pub use tx::{NumDistinctValues, TxId};
|
||||
mod tx_state;
|
||||
|
||||
@@ -1,46 +1,137 @@
|
||||
use super::committed_state::CommittedState;
|
||||
use super::datastore::Result;
|
||||
use super::datastore::{Locking, Result};
|
||||
use crate::db_metrics::DB_METRICS;
|
||||
use crate::error::{IndexError, TableError};
|
||||
use crate::locking_tx_datastore::datastore::ReplayError;
|
||||
use crate::locking_tx_datastore::state_view::iter_st_column_for_table;
|
||||
use crate::locking_tx_datastore::state_view::StateView;
|
||||
use crate::system_tables::{is_built_in_meta_row, StFields as _};
|
||||
use crate::system_tables::{StColumnRow, StTableFields, StTableRow, ST_COLUMN_ID, ST_TABLE_ID};
|
||||
use crate::error::{DatastoreError, IndexError, TableError};
|
||||
use crate::locking_tx_datastore::state_view::{iter_st_column_for_table, StateView};
|
||||
use crate::system_tables::{
|
||||
is_built_in_meta_row, StColumnRow, StFields as _, StTableFields, StTableRow, ST_COLUMN_ID, ST_TABLE_ID,
|
||||
};
|
||||
use anyhow::{anyhow, Context};
|
||||
use core::cell::RefMut;
|
||||
use core::ops::{Deref, DerefMut, RangeBounds};
|
||||
use parking_lot::{RwLock, RwLockReadGuard};
|
||||
use parking_lot::RwLockWriteGuard;
|
||||
use prometheus::core::{AtomicF64, GenericGauge};
|
||||
use prometheus::IntGauge;
|
||||
use spacetimedb_commitlog::payload::txdata;
|
||||
use spacetimedb_data_structures::map::{HashSet, IntMap, IntSet};
|
||||
use spacetimedb_durability::History;
|
||||
use spacetimedb_durability::Txdata;
|
||||
use spacetimedb_lib::Identity;
|
||||
use spacetimedb_primitives::{ColId, ColList, TableId};
|
||||
use spacetimedb_sats::algebraic_value::de::ValueDeserializer;
|
||||
use spacetimedb_sats::buffer::BufReader;
|
||||
use spacetimedb_sats::{AlgebraicValue, Deserialize, ProductValue};
|
||||
use spacetimedb_sats::{bsatn, AlgebraicValue, Deserialize, ProductValue};
|
||||
use spacetimedb_schema::schema::{ColumnSchema, TableSchema};
|
||||
use spacetimedb_schema::table_name::TableName;
|
||||
use spacetimedb_table::indexes::RowPointer;
|
||||
use spacetimedb_table::table::{InsertError, RowRef};
|
||||
use std::cell::RefCell;
|
||||
use std::sync::Arc;
|
||||
use thiserror::Error;
|
||||
|
||||
pub fn apply_history(
|
||||
datastore: &Locking,
|
||||
database_identity: Identity,
|
||||
history: impl History<TxData = Txdata<ProductValue>>,
|
||||
counters: ApplyHistoryCounters,
|
||||
) -> Result<()> {
|
||||
log::info!("[{database_identity}] DATABASE: applying transaction history...");
|
||||
|
||||
// TODO: Revisit once we actually replay history suffixes, ie. starting
|
||||
// from an offset larger than the history's min offset.
|
||||
// TODO: We may want to require that a `tokio::runtime::Handle` is
|
||||
// always supplied when constructing a `RelationalDB`. This would allow
|
||||
// to spawn a timer task here which just prints the progress periodically
|
||||
// in case the history is finite but very long.
|
||||
let (_, max_tx_offset) = history.tx_range_hint();
|
||||
let mut last_logged_percentage = 0;
|
||||
let progress = |tx_offset: u64| {
|
||||
if let Some(max_tx_offset) = max_tx_offset {
|
||||
let percentage = f64::floor((tx_offset as f64 / max_tx_offset as f64) * 100.0) as i32;
|
||||
if percentage > last_logged_percentage && percentage % 10 == 0 {
|
||||
log::info!("[{database_identity}] Loaded {percentage}% ({tx_offset}/{max_tx_offset})");
|
||||
last_logged_percentage = percentage;
|
||||
}
|
||||
// Print _something_ even if we don't know what's still ahead.
|
||||
} else if tx_offset.is_multiple_of(10_000) {
|
||||
log::info!("[{database_identity}] Loading transaction {tx_offset}");
|
||||
}
|
||||
};
|
||||
|
||||
let time_before = std::time::Instant::now();
|
||||
|
||||
let mut replay = datastore.replay(
|
||||
progress,
|
||||
// We don't want to instantiate an incorrect state;
|
||||
// if the commitlog contains an inconsistency we'd rather get a hard error than showing customers incorrect data.
|
||||
ErrorBehavior::FailFast,
|
||||
);
|
||||
let start_tx_offset = replay.next_tx_offset();
|
||||
history
|
||||
.fold_transactions_from(start_tx_offset, &mut replay)
|
||||
.map_err(anyhow::Error::from)?;
|
||||
|
||||
let time_elapsed = time_before.elapsed();
|
||||
counters.replay_commitlog_time_seconds.set(time_elapsed.as_secs_f64());
|
||||
|
||||
let end_tx_offset = replay.next_tx_offset();
|
||||
counters
|
||||
.replay_commitlog_num_commits
|
||||
.set((end_tx_offset - start_tx_offset) as _);
|
||||
|
||||
log::info!("[{database_identity}] DATABASE: applied transaction history");
|
||||
drop(replay); // Neccessary to avoid a deadlock.
|
||||
datastore.rebuild_state_after_replay()?;
|
||||
log::info!("[{database_identity}] DATABASE: rebuilt state after replay");
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub struct ApplyHistoryCounters {
|
||||
pub replay_commitlog_time_seconds: GenericGauge<AtomicF64>,
|
||||
pub replay_commitlog_num_commits: IntGauge,
|
||||
}
|
||||
|
||||
#[derive(Debug, Error)]
|
||||
pub enum ReplayError {
|
||||
#[error("Expected tx offset {expected}, encountered {encountered}")]
|
||||
InvalidOffset { expected: u64, encountered: u64 },
|
||||
#[error(transparent)]
|
||||
Decode(#[from] bsatn::DecodeError),
|
||||
#[error(transparent)]
|
||||
Db(#[from] DatastoreError),
|
||||
#[error(transparent)]
|
||||
Any(#[from] anyhow::Error),
|
||||
}
|
||||
|
||||
/// A [`spacetimedb_commitlog::Decoder`] suitable for replaying a transaction
|
||||
/// history into the database state.
|
||||
pub struct Replay<F> {
|
||||
pub(super) database_identity: Identity,
|
||||
pub(super) committed_state: Arc<RwLock<CommittedState>>,
|
||||
pub(super) progress: RefCell<F>,
|
||||
pub(super) error_behavior: ErrorBehavior,
|
||||
pub struct Replay<'a, F> {
|
||||
database_identity: Identity,
|
||||
committed_state: RefCell<ReplayCommittedState<'a>>,
|
||||
progress: RefCell<F>,
|
||||
error_behavior: ErrorBehavior,
|
||||
}
|
||||
|
||||
impl<F> Replay<F> {
|
||||
fn using_visitor<T>(&self, f: impl FnOnce(&mut ReplayVisitor<'_, F>) -> T) -> T {
|
||||
let mut committed_state = self.committed_state.write();
|
||||
let state = &mut *committed_state;
|
||||
let committed_state = ReplayCommittedState::new(state);
|
||||
impl<'a, F> Replay<'a, F> {
|
||||
pub fn new(
|
||||
database_identity: Identity,
|
||||
committed_state: RwLockWriteGuard<'a, CommittedState>,
|
||||
progress: F,
|
||||
error_behavior: ErrorBehavior,
|
||||
) -> Self {
|
||||
Self {
|
||||
database_identity,
|
||||
committed_state: RefCell::new(ReplayCommittedState::new(committed_state)),
|
||||
progress: RefCell::new(progress),
|
||||
error_behavior,
|
||||
}
|
||||
}
|
||||
|
||||
fn using_visitor<T>(&self, f: impl FnOnce(&mut ReplayVisitor<'_, '_, F>) -> T) -> T {
|
||||
let mut visitor = ReplayVisitor {
|
||||
database_identity: &self.database_identity,
|
||||
committed_state,
|
||||
committed_state: &mut self.committed_state.borrow_mut(),
|
||||
progress: &mut *self.progress.borrow_mut(),
|
||||
dropped_table_names: IntMap::default(),
|
||||
error_behavior: self.error_behavior,
|
||||
@@ -49,16 +140,16 @@ impl<F> Replay<F> {
|
||||
}
|
||||
|
||||
pub fn next_tx_offset(&self) -> u64 {
|
||||
self.committed_state.read_arc().next_tx_offset
|
||||
self.committed_state.borrow().next_tx_offset
|
||||
}
|
||||
|
||||
// NOTE: This is not unused.
|
||||
pub fn committed_state(&self) -> RwLockReadGuard<'_, CommittedState> {
|
||||
self.committed_state.read()
|
||||
pub fn committed_state(&self) -> RefMut<'_, ReplayCommittedState<'a>> {
|
||||
self.committed_state.borrow_mut()
|
||||
}
|
||||
}
|
||||
|
||||
impl<F: FnMut(u64)> spacetimedb_commitlog::Decoder for &mut Replay<F> {
|
||||
impl<F: FnMut(u64)> spacetimedb_commitlog::Decoder for &mut Replay<'_, F> {
|
||||
type Record = txdata::Txdata<ProductValue>;
|
||||
type Error = txdata::DecoderError<ReplayError>;
|
||||
|
||||
@@ -139,9 +230,9 @@ pub enum ErrorBehavior {
|
||||
Warn,
|
||||
}
|
||||
|
||||
struct ReplayVisitor<'a, F> {
|
||||
struct ReplayVisitor<'a, 'cs, F> {
|
||||
database_identity: &'a Identity,
|
||||
committed_state: ReplayCommittedState<'a>,
|
||||
committed_state: &'a mut ReplayCommittedState<'cs>,
|
||||
progress: &'a mut F,
|
||||
// Since deletes are handled before truncation / drop, sometimes the schema
|
||||
// info is gone. We save the name on the first delete of that table so metrics
|
||||
@@ -150,7 +241,7 @@ struct ReplayVisitor<'a, F> {
|
||||
error_behavior: ErrorBehavior,
|
||||
}
|
||||
|
||||
impl<F> ReplayVisitor<'_, F> {
|
||||
impl<F> ReplayVisitor<'_, '_, F> {
|
||||
/// Process `err` according to `self.error_behavior`,
|
||||
/// either warning about it or returning it.
|
||||
///
|
||||
@@ -166,7 +257,7 @@ impl<F> ReplayVisitor<'_, F> {
|
||||
}
|
||||
}
|
||||
|
||||
impl<F: FnMut(u64)> spacetimedb_commitlog::payload::txdata::Visitor for ReplayVisitor<'_, F> {
|
||||
impl<F: FnMut(u64)> spacetimedb_commitlog::payload::txdata::Visitor for ReplayVisitor<'_, '_, F> {
|
||||
type Error = ReplayError;
|
||||
// NOTE: Technically, this could be `()` if and when we can extract the
|
||||
// row data without going through `ProductValue` (PV).
|
||||
@@ -314,9 +405,9 @@ impl<F: FnMut(u64)> spacetimedb_commitlog::payload::txdata::Visitor for ReplayVi
|
||||
}
|
||||
|
||||
/// A `CommittedState` under construction during replay.
|
||||
struct ReplayCommittedState<'cs> {
|
||||
/// The committed state being contructed.
|
||||
state: &'cs mut CommittedState,
|
||||
pub struct ReplayCommittedState<'cs> {
|
||||
/// The committed state being constructed.
|
||||
state: RwLockWriteGuard<'cs, CommittedState>,
|
||||
|
||||
/// Whether the table was dropped within the current transaction during replay.
|
||||
///
|
||||
@@ -361,25 +452,25 @@ struct ReplayCommittedState<'cs> {
|
||||
///
|
||||
/// [`RowPointer`]s from this set are passed to the `unsafe` [`Table::get_row_ref_unchecked`],
|
||||
/// so it's important to properly maintain only [`RowPointer`]s to valid, extant, non-deleted rows.
|
||||
pub(super) replay_table_updated: IntMap<TableId, RowPointer>,
|
||||
replay_table_updated: IntMap<TableId, RowPointer>,
|
||||
}
|
||||
|
||||
impl Deref for ReplayCommittedState<'_> {
|
||||
type Target = CommittedState;
|
||||
|
||||
fn deref(&self) -> &Self::Target {
|
||||
self.state
|
||||
&self.state
|
||||
}
|
||||
}
|
||||
|
||||
impl DerefMut for ReplayCommittedState<'_> {
|
||||
fn deref_mut(&mut self) -> &mut Self::Target {
|
||||
&mut *self.state
|
||||
&mut self.state
|
||||
}
|
||||
}
|
||||
|
||||
impl<'cs> ReplayCommittedState<'cs> {
|
||||
fn new(state: &'cs mut CommittedState) -> Self {
|
||||
fn new(state: RwLockWriteGuard<'cs, CommittedState>) -> Self {
|
||||
Self {
|
||||
state,
|
||||
replay_table_dropped: <_>::default(),
|
||||
@@ -512,7 +603,7 @@ impl<'cs> ReplayCommittedState<'cs> {
|
||||
let target_col_id = ColId::deserialize(ValueDeserializer::from_ref(&st_column_row.elements[1]))
|
||||
.expect("second field in `st_column` should decode to a `ColId`");
|
||||
|
||||
let outdated_st_column_rows = iter_st_column_for_table(self.state, &target_table_id.into())?
|
||||
let outdated_st_column_rows = iter_st_column_for_table(self, &target_table_id.into())?
|
||||
.filter_map(|row_ref| {
|
||||
StColumnRow::try_from(row_ref)
|
||||
.map(|c| (c.col_pos == target_col_id && row_ref.pointer() != row_ptr).then(|| row_ref.pointer()))
|
||||
@@ -542,7 +633,7 @@ impl<'cs> ReplayCommittedState<'cs> {
|
||||
// and not the other one, as it is being replaced.
|
||||
// `Self::ignore_previous_version_of_column` has marked the old version as ignored,
|
||||
// so filter only the non-ignored columns.
|
||||
let mut columns = iter_st_column_for_table(self.state, &table_id.into())?
|
||||
let mut columns = iter_st_column_for_table(self, &table_id.into())?
|
||||
.filter(|row_ref| !self.replay_columns_to_ignore.contains(&row_ref.pointer()))
|
||||
.map(|row_ref| {
|
||||
let row = StColumnRow::try_from(row_ref)?;
|
||||
@@ -766,7 +857,7 @@ mod tests {
|
||||
// Directly call replay_insert on committed state.
|
||||
let row = u32_str_u32(1, "Carol", 40);
|
||||
{
|
||||
let state = &mut *datastore.committed_state.write();
|
||||
let state = datastore.committed_state.write();
|
||||
let mut committed_state = ReplayCommittedState::new(state);
|
||||
committed_state.replay_insert(table_id, &schema, &row)?;
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user