mirror of
https://github.com/clockworklabs/SpacetimeDB.git
synced 2026-05-06 07:26:43 -04:00
Move field replay_table_updated to ReplayCommittedState (#4807)
# Description of Changes Shaves another 32 bytes off of `CommittedState` by moving the last replay only map out of the type. # API and ABI breaking changes None # Expected complexity level and risk 2 # Testing Covered by existing tests.
This commit is contained in:
committed by
GitHub
parent
21b58ef993
commit
809aebd7c4
@@ -87,24 +87,6 @@ pub struct CommittedState {
|
||||
/// - system tables: `st_view_sub`, `st_view_arg`
|
||||
/// - Tables which back views.
|
||||
pub(super) ephemeral_tables: EphemeralTables,
|
||||
|
||||
/// Set of tables whose `st_table` entries have been updated during the currently-replaying transaction,
|
||||
/// mapped to the current most-recent `st_table` row.
|
||||
///
|
||||
/// When processing an insert to `st_table`, if the table already exists, we'll record it here.
|
||||
/// Then, when we see a corresponding delete, we know that the table has not been dropped,
|
||||
/// and so we won't delete the in-memory structure or insert its ID into [`Self::replay_table_dropped`].
|
||||
///
|
||||
/// When looking up the `st_table` row for a table, if it has an entry here,
|
||||
/// that means there are two rows resident in `st_table` at this point in replay.
|
||||
/// We return the row recorded here rather than inspecting `st_table`.
|
||||
///
|
||||
/// We remove from this set when we reach the matching delete,
|
||||
/// and assert this set is empty at the end of each transaction.
|
||||
///
|
||||
/// [`RowPointer`]s from this set are passed to the `unsafe` [`Table::get_row_ref_unchecked`],
|
||||
/// so it's important to properly maintain only [`RowPointer`]s to valid, extant, non-deleted rows.
|
||||
pub(super) replay_table_updated: IntMap<TableId, RowPointer>,
|
||||
}
|
||||
|
||||
impl CommittedState {
|
||||
@@ -138,7 +120,6 @@ impl MemoryUsage for CommittedState {
|
||||
page_pool: _,
|
||||
read_sets,
|
||||
ephemeral_tables,
|
||||
replay_table_updated,
|
||||
} = self;
|
||||
// NOTE(centril): We do not want to include the heap usage of `page_pool` as it's a shared resource.
|
||||
next_tx_offset.heap_usage()
|
||||
@@ -147,7 +128,6 @@ impl MemoryUsage for CommittedState {
|
||||
+ index_id_map.heap_usage()
|
||||
+ read_sets.heap_usage()
|
||||
+ ephemeral_tables.heap_usage()
|
||||
+ replay_table_updated.heap_usage()
|
||||
}
|
||||
}
|
||||
|
||||
@@ -205,24 +185,6 @@ impl StateView for CommittedState {
|
||||
None => Ok(ScanOrIndex::scan_eq(cols, val, self.iter(table_id)?)),
|
||||
}
|
||||
}
|
||||
|
||||
/// Find the `st_table` row for `table_id`, first inspecting [`Self::replay_table_updated`],
|
||||
/// then falling back to [`Self::iter_by_col_eq`] of `st_table`.
|
||||
fn find_st_table_row(&self, table_id: TableId) -> Result<StTableRow> {
|
||||
let row_ref = if let Some(row_ptr) = self.replay_table_updated.get(&table_id) {
|
||||
let (table, blob_store, _) = self.get_table_and_blob_store(table_id)?;
|
||||
// Safety: `row_ptr` is stored in `self.replay_table_updated`,
|
||||
// meaning it was inserted into `st_table` by `replay_insert`
|
||||
// and has not yet been deleted by `replay_delete_by_rel`.
|
||||
unsafe { table.get_row_ref_unchecked(blob_store, *row_ptr) }
|
||||
} else {
|
||||
self.iter_by_col_eq(ST_TABLE_ID, StTableFields::TableId, &table_id.into())?
|
||||
.next()
|
||||
.ok_or_else(|| TableError::IdNotFound(SystemTable::st_table, table_id.into()))?
|
||||
};
|
||||
|
||||
StTableRow::try_from(row_ref)
|
||||
}
|
||||
}
|
||||
|
||||
impl CommittedState {
|
||||
@@ -235,7 +197,6 @@ impl CommittedState {
|
||||
read_sets: <_>::default(),
|
||||
page_pool,
|
||||
ephemeral_tables: <_>::default(),
|
||||
replay_table_updated: <_>::default(),
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -9,7 +9,7 @@ pub mod state_view;
|
||||
pub use state_view::{IterByColEqTx, IterByColRangeTx};
|
||||
pub mod delete_table;
|
||||
mod replay;
|
||||
pub use replay::ErrorBehavior;
|
||||
pub use replay::{ErrorBehavior, Replay};
|
||||
mod tx;
|
||||
pub use tx::{NumDistinctValues, TxId};
|
||||
mod tx_state;
|
||||
|
||||
@@ -8,12 +8,12 @@ use crate::locking_tx_datastore::state_view::StateView;
|
||||
use crate::system_tables::{is_built_in_meta_row, StFields as _};
|
||||
use crate::system_tables::{StColumnRow, StTableFields, StTableRow, ST_COLUMN_ID, ST_TABLE_ID};
|
||||
use anyhow::{anyhow, Context};
|
||||
use core::ops::{Deref, DerefMut};
|
||||
use parking_lot::RwLock;
|
||||
use core::ops::{Deref, DerefMut, RangeBounds};
|
||||
use parking_lot::{RwLock, RwLockReadGuard};
|
||||
use spacetimedb_commitlog::payload::txdata;
|
||||
use spacetimedb_data_structures::map::{HashSet, IntMap, IntSet};
|
||||
use spacetimedb_lib::Identity;
|
||||
use spacetimedb_primitives::{ColId, TableId};
|
||||
use spacetimedb_primitives::{ColId, ColList, TableId};
|
||||
use spacetimedb_sats::algebraic_value::de::ValueDeserializer;
|
||||
use spacetimedb_sats::buffer::BufReader;
|
||||
use spacetimedb_sats::{AlgebraicValue, Deserialize, ProductValue};
|
||||
@@ -51,6 +51,11 @@ impl<F> Replay<F> {
|
||||
pub fn next_tx_offset(&self) -> u64 {
|
||||
self.committed_state.read_arc().next_tx_offset
|
||||
}
|
||||
|
||||
// NOTE: This is not unused.
|
||||
pub fn committed_state(&self) -> RwLockReadGuard<'_, CommittedState> {
|
||||
self.committed_state.read()
|
||||
}
|
||||
}
|
||||
|
||||
impl<F: FnMut(u64)> spacetimedb_commitlog::Decoder for &mut Replay<F> {
|
||||
@@ -339,6 +344,24 @@ struct ReplayCommittedState<'cs> {
|
||||
/// and delete from it during [`Self::replay_delete`] of `st_column` rows.
|
||||
/// We assert this is empty at the end of each transaction.
|
||||
replay_columns_to_ignore: HashSet<RowPointer>,
|
||||
|
||||
/// Set of tables whose `st_table` entries have been updated during the currently-replaying transaction,
|
||||
/// mapped to the current most-recent `st_table` row.
|
||||
///
|
||||
/// When processing an insert to `st_table`, if the table already exists, we'll record it here.
|
||||
/// Then, when we see a corresponding delete, we know that the table has not been dropped,
|
||||
/// and so we won't delete the in-memory structure or insert its ID into [`Self::replay_table_dropped`].
|
||||
///
|
||||
/// When looking up the `st_table` row for a table, if it has an entry here,
|
||||
/// that means there are two rows resident in `st_table` at this point in replay.
|
||||
/// We return the row recorded here rather than inspecting `st_table`.
|
||||
///
|
||||
/// We remove from this set when we reach the matching delete,
|
||||
/// and assert this set is empty at the end of each transaction.
|
||||
///
|
||||
/// [`RowPointer`]s from this set are passed to the `unsafe` [`Table::get_row_ref_unchecked`],
|
||||
/// so it's important to properly maintain only [`RowPointer`]s to valid, extant, non-deleted rows.
|
||||
pub(super) replay_table_updated: IntMap<TableId, RowPointer>,
|
||||
}
|
||||
|
||||
impl Deref for ReplayCommittedState<'_> {
|
||||
@@ -361,6 +384,7 @@ impl<'cs> ReplayCommittedState<'cs> {
|
||||
state,
|
||||
replay_table_dropped: <_>::default(),
|
||||
replay_columns_to_ignore: <_>::default(),
|
||||
replay_table_updated: <_>::default(),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -655,6 +679,69 @@ impl<'cs> ReplayCommittedState<'cs> {
|
||||
}
|
||||
}
|
||||
|
||||
impl StateView for ReplayCommittedState<'_> {
|
||||
/// Find the `st_table` row for `table_id`,
|
||||
/// first inspecting [`Self::replay_table_updated`],
|
||||
/// then falling back to [`CommittedState::iter_by_col_eq`].
|
||||
fn find_st_table_row(&self, table_id: TableId) -> Result<StTableRow> {
|
||||
if let Some(row_ptr) = self.replay_table_updated.get(&table_id) {
|
||||
let (table, blob_store, _) = self.state.get_table_and_blob_store(table_id)?;
|
||||
// SAFETY: `row_ptr` is stored in `self.replay_table_updated`,
|
||||
// meaning it was inserted into `st_table` by `replay_insert`
|
||||
// and has not yet been deleted by `replay_delete_by_rel`.
|
||||
let row_ref = unsafe { table.get_row_ref_unchecked(blob_store, *row_ptr) };
|
||||
StTableRow::try_from(row_ref)
|
||||
} else {
|
||||
self.state.find_st_table_row(table_id)
|
||||
}
|
||||
}
|
||||
|
||||
type Iter<'a>
|
||||
= <CommittedState as StateView>::Iter<'a>
|
||||
where
|
||||
Self: 'a;
|
||||
|
||||
type IterByColRange<'a, R: RangeBounds<AlgebraicValue>>
|
||||
= <CommittedState as StateView>::IterByColRange<'a, R>
|
||||
where
|
||||
Self: 'a;
|
||||
|
||||
type IterByColEq<'a, 'r>
|
||||
= <CommittedState as StateView>::IterByColEq<'a, 'r>
|
||||
where
|
||||
Self: 'a;
|
||||
|
||||
fn get_schema(&self, table_id: TableId) -> Option<&Arc<TableSchema>> {
|
||||
self.state.get_schema(table_id)
|
||||
}
|
||||
|
||||
fn table_row_count(&self, table_id: TableId) -> Option<u64> {
|
||||
self.state.table_row_count(table_id)
|
||||
}
|
||||
|
||||
fn iter(&self, table_id: TableId) -> Result<Self::Iter<'_>> {
|
||||
self.state.iter(table_id)
|
||||
}
|
||||
|
||||
fn iter_by_col_range<R: RangeBounds<AlgebraicValue>>(
|
||||
&self,
|
||||
table_id: TableId,
|
||||
cols: ColList,
|
||||
range: R,
|
||||
) -> Result<Self::IterByColRange<'_, R>> {
|
||||
self.state.iter_by_col_range(table_id, cols, range)
|
||||
}
|
||||
|
||||
fn iter_by_col_eq<'a, 'r>(
|
||||
&'a self,
|
||||
table_id: TableId,
|
||||
cols: impl Into<ColList>,
|
||||
value: &'r AlgebraicValue,
|
||||
) -> Result<Self::IterByColEq<'a, 'r>> {
|
||||
self.state.iter_by_col_eq(table_id, cols, value)
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use crate::{
|
||||
|
||||
Reference in New Issue
Block a user