mirror of
https://github.com/clockworklabs/SpacetimeDB.git
synced 2026-05-06 07:26:43 -04:00
Move field replay_table_updated to ReplayCommittedState (#4807)
# Description of Changes Shaves another 32 bytes off of `CommittedState` by moving the last replay only map out of the type. # API and ABI breaking changes None # Expected complexity level and risk 2 # Testing Covered by existing tests.
This commit is contained in:
committed by
GitHub
parent
21b58ef993
commit
809aebd7c4
@@ -87,24 +87,6 @@ pub struct CommittedState {
|
|||||||
/// - system tables: `st_view_sub`, `st_view_arg`
|
/// - system tables: `st_view_sub`, `st_view_arg`
|
||||||
/// - Tables which back views.
|
/// - Tables which back views.
|
||||||
pub(super) ephemeral_tables: EphemeralTables,
|
pub(super) ephemeral_tables: EphemeralTables,
|
||||||
|
|
||||||
/// Set of tables whose `st_table` entries have been updated during the currently-replaying transaction,
|
|
||||||
/// mapped to the current most-recent `st_table` row.
|
|
||||||
///
|
|
||||||
/// When processing an insert to `st_table`, if the table already exists, we'll record it here.
|
|
||||||
/// Then, when we see a corresponding delete, we know that the table has not been dropped,
|
|
||||||
/// and so we won't delete the in-memory structure or insert its ID into [`Self::replay_table_dropped`].
|
|
||||||
///
|
|
||||||
/// When looking up the `st_table` row for a table, if it has an entry here,
|
|
||||||
/// that means there are two rows resident in `st_table` at this point in replay.
|
|
||||||
/// We return the row recorded here rather than inspecting `st_table`.
|
|
||||||
///
|
|
||||||
/// We remove from this set when we reach the matching delete,
|
|
||||||
/// and assert this set is empty at the end of each transaction.
|
|
||||||
///
|
|
||||||
/// [`RowPointer`]s from this set are passed to the `unsafe` [`Table::get_row_ref_unchecked`],
|
|
||||||
/// so it's important to properly maintain only [`RowPointer`]s to valid, extant, non-deleted rows.
|
|
||||||
pub(super) replay_table_updated: IntMap<TableId, RowPointer>,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
impl CommittedState {
|
impl CommittedState {
|
||||||
@@ -138,7 +120,6 @@ impl MemoryUsage for CommittedState {
|
|||||||
page_pool: _,
|
page_pool: _,
|
||||||
read_sets,
|
read_sets,
|
||||||
ephemeral_tables,
|
ephemeral_tables,
|
||||||
replay_table_updated,
|
|
||||||
} = self;
|
} = self;
|
||||||
// NOTE(centril): We do not want to include the heap usage of `page_pool` as it's a shared resource.
|
// NOTE(centril): We do not want to include the heap usage of `page_pool` as it's a shared resource.
|
||||||
next_tx_offset.heap_usage()
|
next_tx_offset.heap_usage()
|
||||||
@@ -147,7 +128,6 @@ impl MemoryUsage for CommittedState {
|
|||||||
+ index_id_map.heap_usage()
|
+ index_id_map.heap_usage()
|
||||||
+ read_sets.heap_usage()
|
+ read_sets.heap_usage()
|
||||||
+ ephemeral_tables.heap_usage()
|
+ ephemeral_tables.heap_usage()
|
||||||
+ replay_table_updated.heap_usage()
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -205,24 +185,6 @@ impl StateView for CommittedState {
|
|||||||
None => Ok(ScanOrIndex::scan_eq(cols, val, self.iter(table_id)?)),
|
None => Ok(ScanOrIndex::scan_eq(cols, val, self.iter(table_id)?)),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Find the `st_table` row for `table_id`, first inspecting [`Self::replay_table_updated`],
|
|
||||||
/// then falling back to [`Self::iter_by_col_eq`] of `st_table`.
|
|
||||||
fn find_st_table_row(&self, table_id: TableId) -> Result<StTableRow> {
|
|
||||||
let row_ref = if let Some(row_ptr) = self.replay_table_updated.get(&table_id) {
|
|
||||||
let (table, blob_store, _) = self.get_table_and_blob_store(table_id)?;
|
|
||||||
// Safety: `row_ptr` is stored in `self.replay_table_updated`,
|
|
||||||
// meaning it was inserted into `st_table` by `replay_insert`
|
|
||||||
// and has not yet been deleted by `replay_delete_by_rel`.
|
|
||||||
unsafe { table.get_row_ref_unchecked(blob_store, *row_ptr) }
|
|
||||||
} else {
|
|
||||||
self.iter_by_col_eq(ST_TABLE_ID, StTableFields::TableId, &table_id.into())?
|
|
||||||
.next()
|
|
||||||
.ok_or_else(|| TableError::IdNotFound(SystemTable::st_table, table_id.into()))?
|
|
||||||
};
|
|
||||||
|
|
||||||
StTableRow::try_from(row_ref)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
impl CommittedState {
|
impl CommittedState {
|
||||||
@@ -235,7 +197,6 @@ impl CommittedState {
|
|||||||
read_sets: <_>::default(),
|
read_sets: <_>::default(),
|
||||||
page_pool,
|
page_pool,
|
||||||
ephemeral_tables: <_>::default(),
|
ephemeral_tables: <_>::default(),
|
||||||
replay_table_updated: <_>::default(),
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -9,7 +9,7 @@ pub mod state_view;
|
|||||||
pub use state_view::{IterByColEqTx, IterByColRangeTx};
|
pub use state_view::{IterByColEqTx, IterByColRangeTx};
|
||||||
pub mod delete_table;
|
pub mod delete_table;
|
||||||
mod replay;
|
mod replay;
|
||||||
pub use replay::ErrorBehavior;
|
pub use replay::{ErrorBehavior, Replay};
|
||||||
mod tx;
|
mod tx;
|
||||||
pub use tx::{NumDistinctValues, TxId};
|
pub use tx::{NumDistinctValues, TxId};
|
||||||
mod tx_state;
|
mod tx_state;
|
||||||
|
|||||||
@@ -8,12 +8,12 @@ use crate::locking_tx_datastore::state_view::StateView;
|
|||||||
use crate::system_tables::{is_built_in_meta_row, StFields as _};
|
use crate::system_tables::{is_built_in_meta_row, StFields as _};
|
||||||
use crate::system_tables::{StColumnRow, StTableFields, StTableRow, ST_COLUMN_ID, ST_TABLE_ID};
|
use crate::system_tables::{StColumnRow, StTableFields, StTableRow, ST_COLUMN_ID, ST_TABLE_ID};
|
||||||
use anyhow::{anyhow, Context};
|
use anyhow::{anyhow, Context};
|
||||||
use core::ops::{Deref, DerefMut};
|
use core::ops::{Deref, DerefMut, RangeBounds};
|
||||||
use parking_lot::RwLock;
|
use parking_lot::{RwLock, RwLockReadGuard};
|
||||||
use spacetimedb_commitlog::payload::txdata;
|
use spacetimedb_commitlog::payload::txdata;
|
||||||
use spacetimedb_data_structures::map::{HashSet, IntMap, IntSet};
|
use spacetimedb_data_structures::map::{HashSet, IntMap, IntSet};
|
||||||
use spacetimedb_lib::Identity;
|
use spacetimedb_lib::Identity;
|
||||||
use spacetimedb_primitives::{ColId, TableId};
|
use spacetimedb_primitives::{ColId, ColList, TableId};
|
||||||
use spacetimedb_sats::algebraic_value::de::ValueDeserializer;
|
use spacetimedb_sats::algebraic_value::de::ValueDeserializer;
|
||||||
use spacetimedb_sats::buffer::BufReader;
|
use spacetimedb_sats::buffer::BufReader;
|
||||||
use spacetimedb_sats::{AlgebraicValue, Deserialize, ProductValue};
|
use spacetimedb_sats::{AlgebraicValue, Deserialize, ProductValue};
|
||||||
@@ -51,6 +51,11 @@ impl<F> Replay<F> {
|
|||||||
pub fn next_tx_offset(&self) -> u64 {
|
pub fn next_tx_offset(&self) -> u64 {
|
||||||
self.committed_state.read_arc().next_tx_offset
|
self.committed_state.read_arc().next_tx_offset
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// NOTE: This is not unused.
|
||||||
|
pub fn committed_state(&self) -> RwLockReadGuard<'_, CommittedState> {
|
||||||
|
self.committed_state.read()
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<F: FnMut(u64)> spacetimedb_commitlog::Decoder for &mut Replay<F> {
|
impl<F: FnMut(u64)> spacetimedb_commitlog::Decoder for &mut Replay<F> {
|
||||||
@@ -339,6 +344,24 @@ struct ReplayCommittedState<'cs> {
|
|||||||
/// and delete from it during [`Self::replay_delete`] of `st_column` rows.
|
/// and delete from it during [`Self::replay_delete`] of `st_column` rows.
|
||||||
/// We assert this is empty at the end of each transaction.
|
/// We assert this is empty at the end of each transaction.
|
||||||
replay_columns_to_ignore: HashSet<RowPointer>,
|
replay_columns_to_ignore: HashSet<RowPointer>,
|
||||||
|
|
||||||
|
/// Set of tables whose `st_table` entries have been updated during the currently-replaying transaction,
|
||||||
|
/// mapped to the current most-recent `st_table` row.
|
||||||
|
///
|
||||||
|
/// When processing an insert to `st_table`, if the table already exists, we'll record it here.
|
||||||
|
/// Then, when we see a corresponding delete, we know that the table has not been dropped,
|
||||||
|
/// and so we won't delete the in-memory structure or insert its ID into [`Self::replay_table_dropped`].
|
||||||
|
///
|
||||||
|
/// When looking up the `st_table` row for a table, if it has an entry here,
|
||||||
|
/// that means there are two rows resident in `st_table` at this point in replay.
|
||||||
|
/// We return the row recorded here rather than inspecting `st_table`.
|
||||||
|
///
|
||||||
|
/// We remove from this set when we reach the matching delete,
|
||||||
|
/// and assert this set is empty at the end of each transaction.
|
||||||
|
///
|
||||||
|
/// [`RowPointer`]s from this set are passed to the `unsafe` [`Table::get_row_ref_unchecked`],
|
||||||
|
/// so it's important to properly maintain only [`RowPointer`]s to valid, extant, non-deleted rows.
|
||||||
|
pub(super) replay_table_updated: IntMap<TableId, RowPointer>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Deref for ReplayCommittedState<'_> {
|
impl Deref for ReplayCommittedState<'_> {
|
||||||
@@ -361,6 +384,7 @@ impl<'cs> ReplayCommittedState<'cs> {
|
|||||||
state,
|
state,
|
||||||
replay_table_dropped: <_>::default(),
|
replay_table_dropped: <_>::default(),
|
||||||
replay_columns_to_ignore: <_>::default(),
|
replay_columns_to_ignore: <_>::default(),
|
||||||
|
replay_table_updated: <_>::default(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -655,6 +679,69 @@ impl<'cs> ReplayCommittedState<'cs> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl StateView for ReplayCommittedState<'_> {
|
||||||
|
/// Find the `st_table` row for `table_id`,
|
||||||
|
/// first inspecting [`Self::replay_table_updated`],
|
||||||
|
/// then falling back to [`CommittedState::iter_by_col_eq`].
|
||||||
|
fn find_st_table_row(&self, table_id: TableId) -> Result<StTableRow> {
|
||||||
|
if let Some(row_ptr) = self.replay_table_updated.get(&table_id) {
|
||||||
|
let (table, blob_store, _) = self.state.get_table_and_blob_store(table_id)?;
|
||||||
|
// SAFETY: `row_ptr` is stored in `self.replay_table_updated`,
|
||||||
|
// meaning it was inserted into `st_table` by `replay_insert`
|
||||||
|
// and has not yet been deleted by `replay_delete_by_rel`.
|
||||||
|
let row_ref = unsafe { table.get_row_ref_unchecked(blob_store, *row_ptr) };
|
||||||
|
StTableRow::try_from(row_ref)
|
||||||
|
} else {
|
||||||
|
self.state.find_st_table_row(table_id)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
type Iter<'a>
|
||||||
|
= <CommittedState as StateView>::Iter<'a>
|
||||||
|
where
|
||||||
|
Self: 'a;
|
||||||
|
|
||||||
|
type IterByColRange<'a, R: RangeBounds<AlgebraicValue>>
|
||||||
|
= <CommittedState as StateView>::IterByColRange<'a, R>
|
||||||
|
where
|
||||||
|
Self: 'a;
|
||||||
|
|
||||||
|
type IterByColEq<'a, 'r>
|
||||||
|
= <CommittedState as StateView>::IterByColEq<'a, 'r>
|
||||||
|
where
|
||||||
|
Self: 'a;
|
||||||
|
|
||||||
|
fn get_schema(&self, table_id: TableId) -> Option<&Arc<TableSchema>> {
|
||||||
|
self.state.get_schema(table_id)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn table_row_count(&self, table_id: TableId) -> Option<u64> {
|
||||||
|
self.state.table_row_count(table_id)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn iter(&self, table_id: TableId) -> Result<Self::Iter<'_>> {
|
||||||
|
self.state.iter(table_id)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn iter_by_col_range<R: RangeBounds<AlgebraicValue>>(
|
||||||
|
&self,
|
||||||
|
table_id: TableId,
|
||||||
|
cols: ColList,
|
||||||
|
range: R,
|
||||||
|
) -> Result<Self::IterByColRange<'_, R>> {
|
||||||
|
self.state.iter_by_col_range(table_id, cols, range)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn iter_by_col_eq<'a, 'r>(
|
||||||
|
&'a self,
|
||||||
|
table_id: TableId,
|
||||||
|
cols: impl Into<ColList>,
|
||||||
|
value: &'r AlgebraicValue,
|
||||||
|
) -> Result<Self::IterByColEq<'a, 'r>> {
|
||||||
|
self.state.iter_by_col_eq(table_id, cols, value)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod tests {
|
mod tests {
|
||||||
use crate::{
|
use crate::{
|
||||||
|
|||||||
Reference in New Issue
Block a user