From 809aebd7c49112d4875cf8307e9f0512abb6c3ef Mon Sep 17 00:00:00 2001 From: Mazdak Farrokhzad Date: Thu, 16 Apr 2026 15:08:30 +0200 Subject: [PATCH] Move field `replay_table_updated` to `ReplayCommittedState` (#4807) # Description of Changes Shaves another 32 bytes off of `CommittedState` by moving the last replay only map out of the type. # API and ABI breaking changes None # Expected complexity level and risk 2 # Testing Covered by existing tests. --- .../locking_tx_datastore/committed_state.rs | 39 -------- .../datastore/src/locking_tx_datastore/mod.rs | 2 +- .../src/locking_tx_datastore/replay.rs | 93 ++++++++++++++++++- 3 files changed, 91 insertions(+), 43 deletions(-) diff --git a/crates/datastore/src/locking_tx_datastore/committed_state.rs b/crates/datastore/src/locking_tx_datastore/committed_state.rs index 6b60e16ce..9398ff66a 100644 --- a/crates/datastore/src/locking_tx_datastore/committed_state.rs +++ b/crates/datastore/src/locking_tx_datastore/committed_state.rs @@ -87,24 +87,6 @@ pub struct CommittedState { /// - system tables: `st_view_sub`, `st_view_arg` /// - Tables which back views. pub(super) ephemeral_tables: EphemeralTables, - - /// Set of tables whose `st_table` entries have been updated during the currently-replaying transaction, - /// mapped to the current most-recent `st_table` row. - /// - /// When processing an insert to `st_table`, if the table already exists, we'll record it here. - /// Then, when we see a corresponding delete, we know that the table has not been dropped, - /// and so we won't delete the in-memory structure or insert its ID into [`Self::replay_table_dropped`]. - /// - /// When looking up the `st_table` row for a table, if it has an entry here, - /// that means there are two rows resident in `st_table` at this point in replay. - /// We return the row recorded here rather than inspecting `st_table`. - /// - /// We remove from this set when we reach the matching delete, - /// and assert this set is empty at the end of each transaction. - /// - /// [`RowPointer`]s from this set are passed to the `unsafe` [`Table::get_row_ref_unchecked`], - /// so it's important to properly maintain only [`RowPointer`]s to valid, extant, non-deleted rows. - pub(super) replay_table_updated: IntMap, } impl CommittedState { @@ -138,7 +120,6 @@ impl MemoryUsage for CommittedState { page_pool: _, read_sets, ephemeral_tables, - replay_table_updated, } = self; // NOTE(centril): We do not want to include the heap usage of `page_pool` as it's a shared resource. next_tx_offset.heap_usage() @@ -147,7 +128,6 @@ impl MemoryUsage for CommittedState { + index_id_map.heap_usage() + read_sets.heap_usage() + ephemeral_tables.heap_usage() - + replay_table_updated.heap_usage() } } @@ -205,24 +185,6 @@ impl StateView for CommittedState { None => Ok(ScanOrIndex::scan_eq(cols, val, self.iter(table_id)?)), } } - - /// Find the `st_table` row for `table_id`, first inspecting [`Self::replay_table_updated`], - /// then falling back to [`Self::iter_by_col_eq`] of `st_table`. - fn find_st_table_row(&self, table_id: TableId) -> Result { - let row_ref = if let Some(row_ptr) = self.replay_table_updated.get(&table_id) { - let (table, blob_store, _) = self.get_table_and_blob_store(table_id)?; - // Safety: `row_ptr` is stored in `self.replay_table_updated`, - // meaning it was inserted into `st_table` by `replay_insert` - // and has not yet been deleted by `replay_delete_by_rel`. - unsafe { table.get_row_ref_unchecked(blob_store, *row_ptr) } - } else { - self.iter_by_col_eq(ST_TABLE_ID, StTableFields::TableId, &table_id.into())? - .next() - .ok_or_else(|| TableError::IdNotFound(SystemTable::st_table, table_id.into()))? - }; - - StTableRow::try_from(row_ref) - } } impl CommittedState { @@ -235,7 +197,6 @@ impl CommittedState { read_sets: <_>::default(), page_pool, ephemeral_tables: <_>::default(), - replay_table_updated: <_>::default(), } } diff --git a/crates/datastore/src/locking_tx_datastore/mod.rs b/crates/datastore/src/locking_tx_datastore/mod.rs index ea1c21017..21954e703 100644 --- a/crates/datastore/src/locking_tx_datastore/mod.rs +++ b/crates/datastore/src/locking_tx_datastore/mod.rs @@ -9,7 +9,7 @@ pub mod state_view; pub use state_view::{IterByColEqTx, IterByColRangeTx}; pub mod delete_table; mod replay; -pub use replay::ErrorBehavior; +pub use replay::{ErrorBehavior, Replay}; mod tx; pub use tx::{NumDistinctValues, TxId}; mod tx_state; diff --git a/crates/datastore/src/locking_tx_datastore/replay.rs b/crates/datastore/src/locking_tx_datastore/replay.rs index 51a8ef86d..76611c776 100644 --- a/crates/datastore/src/locking_tx_datastore/replay.rs +++ b/crates/datastore/src/locking_tx_datastore/replay.rs @@ -8,12 +8,12 @@ use crate::locking_tx_datastore::state_view::StateView; use crate::system_tables::{is_built_in_meta_row, StFields as _}; use crate::system_tables::{StColumnRow, StTableFields, StTableRow, ST_COLUMN_ID, ST_TABLE_ID}; use anyhow::{anyhow, Context}; -use core::ops::{Deref, DerefMut}; -use parking_lot::RwLock; +use core::ops::{Deref, DerefMut, RangeBounds}; +use parking_lot::{RwLock, RwLockReadGuard}; use spacetimedb_commitlog::payload::txdata; use spacetimedb_data_structures::map::{HashSet, IntMap, IntSet}; use spacetimedb_lib::Identity; -use spacetimedb_primitives::{ColId, TableId}; +use spacetimedb_primitives::{ColId, ColList, TableId}; use spacetimedb_sats::algebraic_value::de::ValueDeserializer; use spacetimedb_sats::buffer::BufReader; use spacetimedb_sats::{AlgebraicValue, Deserialize, ProductValue}; @@ -51,6 +51,11 @@ impl Replay { pub fn next_tx_offset(&self) -> u64 { self.committed_state.read_arc().next_tx_offset } + + // NOTE: This is not unused. + pub fn committed_state(&self) -> RwLockReadGuard<'_, CommittedState> { + self.committed_state.read() + } } impl spacetimedb_commitlog::Decoder for &mut Replay { @@ -339,6 +344,24 @@ struct ReplayCommittedState<'cs> { /// and delete from it during [`Self::replay_delete`] of `st_column` rows. /// We assert this is empty at the end of each transaction. replay_columns_to_ignore: HashSet, + + /// Set of tables whose `st_table` entries have been updated during the currently-replaying transaction, + /// mapped to the current most-recent `st_table` row. + /// + /// When processing an insert to `st_table`, if the table already exists, we'll record it here. + /// Then, when we see a corresponding delete, we know that the table has not been dropped, + /// and so we won't delete the in-memory structure or insert its ID into [`Self::replay_table_dropped`]. + /// + /// When looking up the `st_table` row for a table, if it has an entry here, + /// that means there are two rows resident in `st_table` at this point in replay. + /// We return the row recorded here rather than inspecting `st_table`. + /// + /// We remove from this set when we reach the matching delete, + /// and assert this set is empty at the end of each transaction. + /// + /// [`RowPointer`]s from this set are passed to the `unsafe` [`Table::get_row_ref_unchecked`], + /// so it's important to properly maintain only [`RowPointer`]s to valid, extant, non-deleted rows. + pub(super) replay_table_updated: IntMap, } impl Deref for ReplayCommittedState<'_> { @@ -361,6 +384,7 @@ impl<'cs> ReplayCommittedState<'cs> { state, replay_table_dropped: <_>::default(), replay_columns_to_ignore: <_>::default(), + replay_table_updated: <_>::default(), } } @@ -655,6 +679,69 @@ impl<'cs> ReplayCommittedState<'cs> { } } +impl StateView for ReplayCommittedState<'_> { + /// Find the `st_table` row for `table_id`, + /// first inspecting [`Self::replay_table_updated`], + /// then falling back to [`CommittedState::iter_by_col_eq`]. + fn find_st_table_row(&self, table_id: TableId) -> Result { + if let Some(row_ptr) = self.replay_table_updated.get(&table_id) { + let (table, blob_store, _) = self.state.get_table_and_blob_store(table_id)?; + // SAFETY: `row_ptr` is stored in `self.replay_table_updated`, + // meaning it was inserted into `st_table` by `replay_insert` + // and has not yet been deleted by `replay_delete_by_rel`. + let row_ref = unsafe { table.get_row_ref_unchecked(blob_store, *row_ptr) }; + StTableRow::try_from(row_ref) + } else { + self.state.find_st_table_row(table_id) + } + } + + type Iter<'a> + = ::Iter<'a> + where + Self: 'a; + + type IterByColRange<'a, R: RangeBounds> + = ::IterByColRange<'a, R> + where + Self: 'a; + + type IterByColEq<'a, 'r> + = ::IterByColEq<'a, 'r> + where + Self: 'a; + + fn get_schema(&self, table_id: TableId) -> Option<&Arc> { + self.state.get_schema(table_id) + } + + fn table_row_count(&self, table_id: TableId) -> Option { + self.state.table_row_count(table_id) + } + + fn iter(&self, table_id: TableId) -> Result> { + self.state.iter(table_id) + } + + fn iter_by_col_range>( + &self, + table_id: TableId, + cols: ColList, + range: R, + ) -> Result> { + self.state.iter_by_col_range(table_id, cols, range) + } + + fn iter_by_col_eq<'a, 'r>( + &'a self, + table_id: TableId, + cols: impl Into, + value: &'r AlgebraicValue, + ) -> Result> { + self.state.iter_by_col_eq(table_id, cols, value) + } +} + #[cfg(test)] mod tests { use crate::{