mirror of
https://github.com/clockworklabs/SpacetimeDB.git
synced 2026-05-06 07:26:43 -04:00
Finish refactoring out replay (#4850)
# Description of Changes Move the rest of replay logic to `mod replay`. Closes #4055. # API and ABI breaking changes None # Expected complexity level and risk 2 # Testing Just code motion.
This commit is contained in:
committed by
GitHub
parent
7c51914cd5
commit
a32cffa769
@@ -8,12 +8,11 @@ use super::{
|
||||
};
|
||||
use crate::{
|
||||
db_metrics::DB_METRICS,
|
||||
error::{DatastoreError, TableError, ViewError},
|
||||
error::TableError,
|
||||
execution_context::ExecutionContext,
|
||||
locking_tx_datastore::{mut_tx::ViewReadSets, state_view::ScanOrIndex, IterByColRangeTx},
|
||||
system_tables::{
|
||||
system_tables, table_id_is_reserved, StColumnRow, StConstraintData, StConstraintRow, StIndexRow,
|
||||
StSequenceFields, StSequenceRow, StTableFields, StTableRow, StViewRow, SystemTable, ST_CLIENT_ID,
|
||||
system_tables, StColumnRow, StConstraintRow, StIndexRow, StSequenceRow, StTableRow, SystemTable, ST_CLIENT_ID,
|
||||
ST_CLIENT_IDX, ST_COLUMN_ID, ST_COLUMN_IDX, ST_COLUMN_NAME, ST_CONSTRAINT_ID, ST_CONSTRAINT_IDX,
|
||||
ST_CONSTRAINT_NAME, ST_INDEX_ID, ST_INDEX_IDX, ST_INDEX_NAME, ST_MODULE_ID, ST_MODULE_IDX,
|
||||
ST_ROW_LEVEL_SECURITY_ID, ST_ROW_LEVEL_SECURITY_IDX, ST_SCHEDULED_ID, ST_SCHEDULED_IDX, ST_SEQUENCE_ID,
|
||||
@@ -33,13 +32,13 @@ use crate::{
|
||||
};
|
||||
use anyhow::anyhow;
|
||||
use core::{convert::Infallible, ops::RangeBounds};
|
||||
use spacetimedb_data_structures::map::{HashMap, HashSet, IntMap, IntSet};
|
||||
use spacetimedb_data_structures::map::{IntMap, IntSet};
|
||||
use spacetimedb_durability::TxOffset;
|
||||
use spacetimedb_lib::{db::auth::StTableType, Identity};
|
||||
use spacetimedb_primitives::{ColList, ColSet, IndexId, SequenceId, TableId, ViewId};
|
||||
use spacetimedb_primitives::{ColList, IndexId, TableId, ViewId};
|
||||
use spacetimedb_sats::memory_usage::MemoryUsage;
|
||||
use spacetimedb_sats::{AlgebraicValue, ProductValue};
|
||||
use spacetimedb_schema::{def::IndexAlgorithm, schema::TableSchema};
|
||||
use spacetimedb_schema::schema::TableSchema;
|
||||
use spacetimedb_table::{
|
||||
blob_store::{BlobStore, HashMapBlobStore},
|
||||
indexes::{RowPointer, SquashedOffset},
|
||||
@@ -200,93 +199,6 @@ impl CommittedState {
|
||||
}
|
||||
}
|
||||
|
||||
/// Delete all but the highest-allocation `st_sequence` row for each system sequence.
|
||||
///
|
||||
/// Prior versions of `RelationalDb::migrate_system_tables` (defined in the `core` crate)
|
||||
/// initialized newly-created system sequences to `allocation: 4097`,
|
||||
/// while `committed_state::bootstrap_system_tables` sets `allocation: 4096`.
|
||||
/// This affected the system table migration which added
|
||||
/// `st_view_view_id_seq` and `st_view_arg_id_seq`.
|
||||
/// As a result, when replaying these databases' commitlogs without a snapshot,
|
||||
/// we will end up with two rows in `st_sequence` for each of these sequences,
|
||||
/// resulting in a unique constraint violation in `CommittedState::build_indexes`.
|
||||
/// We call this method in [`super::datastore::Locking::rebuild_state_after_replay`]
|
||||
/// to avoid that unique constraint violation.
|
||||
pub(super) fn fixup_delete_duplicate_system_sequence_rows(&mut self) {
|
||||
struct StSequenceRowInfo {
|
||||
sequence_id: SequenceId,
|
||||
allocated: i128,
|
||||
row_pointer: RowPointer,
|
||||
}
|
||||
|
||||
// Get all the `st_sequence` rows which refer to sequences on system tables,
|
||||
// including any duplicates caused by the bug described above.
|
||||
let sequence_rows = self
|
||||
.table_scan(ST_SEQUENCE_ID)
|
||||
.expect("`st_sequence` should exist")
|
||||
.filter_map(|row_ref| {
|
||||
// Read the table ID to which the sequence refers,
|
||||
// in order to determine if this is a system sequence or not.
|
||||
let table_id = row_ref
|
||||
.read_col::<TableId>(StSequenceFields::TableId)
|
||||
.expect("`st_sequence` row should conform to `st_sequence` schema");
|
||||
|
||||
// If this sequence refers to a system table, it may need a fixup.
|
||||
// User tables' sequences will never need fixups.
|
||||
table_id_is_reserved(table_id).then(|| {
|
||||
let allocated = row_ref
|
||||
.read_col::<i128>(StSequenceFields::Allocated)
|
||||
.expect("`st_sequence` row should conform to `st_sequence` schema");
|
||||
let sequence_id = row_ref
|
||||
.read_col::<SequenceId>(StSequenceFields::SequenceId)
|
||||
.expect("`st_sequence` row should conform to `st_sequence` schema");
|
||||
StSequenceRowInfo {
|
||||
allocated,
|
||||
sequence_id,
|
||||
row_pointer: row_ref.pointer(),
|
||||
}
|
||||
})
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
let (st_sequence, blob_store, ..) = self
|
||||
.get_table_and_blob_store_mut(ST_SEQUENCE_ID)
|
||||
.expect("`st_sequence` should exist");
|
||||
|
||||
// Track the row with the highest allocation for each sequence.
|
||||
let mut highest_allocations: HashMap<SequenceId, (i128, RowPointer)> = HashMap::default();
|
||||
|
||||
for StSequenceRowInfo {
|
||||
sequence_id,
|
||||
allocated,
|
||||
row_pointer,
|
||||
} in sequence_rows
|
||||
{
|
||||
// For each `st_sequence` row which refers to a system table,
|
||||
// if we've already seen a row for the same sequence,
|
||||
// keep only the row with the higher allocation.
|
||||
if let Some((prev_allocated, prev_row_pointer)) =
|
||||
highest_allocations.insert(sequence_id, (allocated, row_pointer))
|
||||
{
|
||||
// We have a duplicate row. We want to keep whichever has the higher `allocated`,
|
||||
// and delete the other.
|
||||
let row_pointer_to_delete = if prev_allocated > allocated {
|
||||
// The previous row has a higher allocation than the new row,
|
||||
// so delete the new row and restore `previous` to `highest_allocations`.
|
||||
highest_allocations.insert(sequence_id, (prev_allocated, prev_row_pointer));
|
||||
row_pointer
|
||||
} else {
|
||||
// The previous row does not have a higher allocation than the new,
|
||||
// so delete the previous row and keep the new one.
|
||||
prev_row_pointer
|
||||
};
|
||||
|
||||
st_sequence.delete(blob_store, row_pointer_to_delete, |_| ())
|
||||
.expect("Duplicated `st_sequence` row at `row_pointer_to_delete` should be present in `st_sequence` during fixup");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Extremely delicate function to bootstrap the system tables.
|
||||
/// Don't update this unless you know what you're doing.
|
||||
pub(super) fn bootstrap_system_tables(&mut self, database_identity: Identity) -> Result<()> {
|
||||
@@ -484,106 +396,6 @@ impl CommittedState {
|
||||
Ok(sequence_state)
|
||||
}
|
||||
|
||||
pub(super) fn build_indexes(&mut self) -> Result<()> {
|
||||
let st_indexes = self.tables.get(&ST_INDEX_ID).unwrap();
|
||||
let rows = st_indexes
|
||||
.scan_rows(&self.blob_store)
|
||||
.map(StIndexRow::try_from)
|
||||
.collect::<Result<Vec<_>>>()?;
|
||||
|
||||
let st_constraints = self.tables.get(&ST_CONSTRAINT_ID).unwrap();
|
||||
let unique_constraints: HashSet<(TableId, ColSet)> = st_constraints
|
||||
.scan_rows(&self.blob_store)
|
||||
.map(StConstraintRow::try_from)
|
||||
.filter_map(Result::ok)
|
||||
.filter_map(|constraint| match constraint.constraint_data {
|
||||
StConstraintData::Unique { columns } => Some((constraint.table_id, columns)),
|
||||
_ => None,
|
||||
})
|
||||
.collect();
|
||||
|
||||
for index_row in rows {
|
||||
let index_id = index_row.index_id;
|
||||
let table_id = index_row.table_id;
|
||||
let (table, blob_store, index_id_map, _) = self
|
||||
.get_table_and_blob_store_mut(table_id)
|
||||
.expect("index should exist in committed state; cannot create it");
|
||||
let algo: IndexAlgorithm = index_row.index_algorithm.into();
|
||||
let columns: ColSet = algo.columns().into();
|
||||
let is_unique = unique_constraints.contains(&(table_id, columns));
|
||||
|
||||
let index = table.new_index(&algo, is_unique)?;
|
||||
// SAFETY: `index` was derived from `table`.
|
||||
unsafe { table.insert_index(blob_store, index_id, index) }
|
||||
.expect("rebuilding should not cause constraint violations");
|
||||
index_id_map.insert(index_id, table_id);
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub(super) fn collect_ephemeral_tables(&mut self) -> Result<()> {
|
||||
self.ephemeral_tables = self.ephemeral_tables()?.into_iter().collect();
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn ephemeral_tables(&self) -> Result<Vec<TableId>> {
|
||||
let mut tables = vec![ST_VIEW_SUB_ID, ST_VIEW_ARG_ID];
|
||||
|
||||
let Some(st_view) = self.tables.get(&ST_VIEW_ID) else {
|
||||
return Ok(tables);
|
||||
};
|
||||
let backing_tables = st_view
|
||||
.scan_rows(&self.blob_store)
|
||||
.map(|row_ref| {
|
||||
let view_row = StViewRow::try_from(row_ref)?;
|
||||
view_row
|
||||
.table_id
|
||||
.ok_or_else(|| DatastoreError::View(ViewError::TableNotFound(view_row.view_id)))
|
||||
})
|
||||
.collect::<Result<Vec<_>>>()?;
|
||||
|
||||
tables.extend(backing_tables);
|
||||
|
||||
Ok(tables)
|
||||
}
|
||||
|
||||
/// After replaying all old transactions,
|
||||
/// inserts and deletes into the system tables
|
||||
/// might not be reflected in the schemas of the built tables.
|
||||
/// So we must re-schema every built table.
|
||||
pub(super) fn reschema_tables(&mut self) -> Result<()> {
|
||||
// For already built tables, we need to reschema them to account for constraints et al.
|
||||
let mut schemas = Vec::with_capacity(self.tables.len());
|
||||
for table_id in self.tables.keys().copied() {
|
||||
schemas.push(self.schema_for_table_raw(table_id)?);
|
||||
}
|
||||
for (table, schema) in self.tables.values_mut().zip(schemas) {
|
||||
table.with_mut_schema(|s| *s = schema);
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// After replaying all old transactions, tables which have rows will
|
||||
/// have been created in memory, but tables with no rows will not have
|
||||
/// been created. This function ensures that they are created.
|
||||
pub(super) fn build_missing_tables(&mut self) -> Result<()> {
|
||||
// Find all ids of tables that are in `st_tables` but haven't been built.
|
||||
let table_ids = self
|
||||
.get_table(ST_TABLE_ID)
|
||||
.unwrap()
|
||||
.scan_rows(&self.blob_store)
|
||||
.map(|r| r.read_col(StTableFields::TableId).unwrap())
|
||||
.filter(|table_id| self.get_table(*table_id).is_none())
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
// Construct their schemas and insert tables for them.
|
||||
for table_id in table_ids {
|
||||
let schema = self.schema_for_table(table_id)?;
|
||||
self.create_table(table_id, schema);
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Returns an iterator doing a full table scan on `table_id`.
|
||||
pub(super) fn table_scan<'a>(&'a self, table_id: TableId) -> Option<TableScanIter<'a>> {
|
||||
Some(self.get_table(table_id)?.scan_rows(&self.blob_store))
|
||||
@@ -1008,7 +820,7 @@ impl CommittedState {
|
||||
Table::new(schema, SquashedOffset::COMMITTED_STATE)
|
||||
}
|
||||
|
||||
fn create_table(&mut self, table_id: TableId, schema: Arc<TableSchema>) {
|
||||
pub(super) fn create_table(&mut self, table_id: TableId, schema: Arc<TableSchema>) {
|
||||
self.tables.insert(table_id, Self::make_table(schema));
|
||||
}
|
||||
|
||||
|
||||
@@ -3,7 +3,7 @@ use super::{
|
||||
tx_state::TxState,
|
||||
};
|
||||
use crate::execution_context::{Workload, WorkloadType};
|
||||
use crate::locking_tx_datastore::replay::{ErrorBehavior, Replay};
|
||||
use crate::locking_tx_datastore::replay::{build_sequence_state, ErrorBehavior, Replay};
|
||||
use crate::{
|
||||
db_metrics::DB_METRICS,
|
||||
error::{DatastoreError, TableError},
|
||||
@@ -68,7 +68,7 @@ pub struct Locking {
|
||||
// made private again.
|
||||
pub committed_state: Arc<RwLock<CommittedState>>,
|
||||
/// The state of sequence generation in this database.
|
||||
sequence_state: Arc<Mutex<SequencesState>>,
|
||||
pub(super) sequence_state: Arc<Mutex<SequencesState>>,
|
||||
/// The identity of this database.
|
||||
pub(crate) database_identity: Identity,
|
||||
}
|
||||
@@ -117,11 +117,7 @@ impl Locking {
|
||||
commit_state.bootstrap_system_tables(database_identity)?;
|
||||
// The database tables are now initialized with the correct data.
|
||||
// Now we have to build our in memory structures.
|
||||
{
|
||||
let sequence_state = commit_state.build_sequence_state()?;
|
||||
// Reset our sequence state so that they start in the right places.
|
||||
*datastore.sequence_state.lock() = sequence_state;
|
||||
}
|
||||
build_sequence_state(&datastore, &mut commit_state)?;
|
||||
|
||||
// We don't want to build indexes here; we'll build those later,
|
||||
// in `rebuild_state_after_replay`.
|
||||
@@ -132,38 +128,6 @@ impl Locking {
|
||||
Ok(datastore)
|
||||
}
|
||||
|
||||
/// The purpose of this is to rebuild the state of the datastore
|
||||
/// after having inserted all of rows from the message log.
|
||||
/// This is necessary because, for example, inserting a row into `st_table`
|
||||
/// is not equivalent to calling `create_table`.
|
||||
/// There may eventually be better way to do this, but this will have to do for now.
|
||||
pub fn rebuild_state_after_replay(&self) -> Result<()> {
|
||||
let mut committed_state = self.committed_state.write_arc();
|
||||
|
||||
// Prior versions of `RelationalDb::migrate_system_tables` (defined in the `core` crate)
|
||||
// initialized newly-created system sequences to `allocation: 4097`,
|
||||
// while `committed_state::bootstrap_system_tables` sets `allocation: 4096`.
|
||||
// This affected the system table migration which added
|
||||
// `st_view_view_id_seq` and `st_view_arg_id_seq`.
|
||||
// As a result, when replaying these databases' commitlogs without a snapshot,
|
||||
// we will end up with two rows in `st_sequence` for each of these sequences,
|
||||
// resulting in a unique constraint violation in `CommittedState::build_indexes`.
|
||||
// We fix this by, for each system sequence, deleting all but the row with the highest allocation.
|
||||
committed_state.fixup_delete_duplicate_system_sequence_rows();
|
||||
|
||||
// `build_missing_tables` must be called before indexes.
|
||||
// Honestly this should maybe just be one big procedure.
|
||||
// See John Carmack's philosophy on this.
|
||||
committed_state.reschema_tables()?;
|
||||
committed_state.build_missing_tables()?;
|
||||
committed_state.build_indexes()?;
|
||||
// Figure out where to pick up for each sequence.
|
||||
*self.sequence_state.lock() = committed_state.build_sequence_state()?;
|
||||
|
||||
committed_state.collect_ephemeral_tables()?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Obtain a [`spacetimedb_commitlog::Decoder`] suitable for replaying a
|
||||
/// [`spacetimedb_durability::History`] onto the currently committed state.
|
||||
///
|
||||
@@ -242,11 +206,7 @@ impl Locking {
|
||||
// Set the sequence state. In practice we will end up doing this again after replaying
|
||||
// the commit log, but we do it here too just to avoid having an incorrectly restored
|
||||
// snapshot.
|
||||
{
|
||||
let sequence_state = committed_state.build_sequence_state()?;
|
||||
// Reset our sequence state so that they start in the right places.
|
||||
*datastore.sequence_state.lock() = sequence_state;
|
||||
}
|
||||
build_sequence_state(&datastore, &mut committed_state)?;
|
||||
|
||||
// The next TX offset after restoring from a snapshot is one greater than the snapshotted offset.
|
||||
committed_state.next_tx_offset = tx_offset + 1;
|
||||
|
||||
@@ -1,10 +1,12 @@
|
||||
use super::committed_state::CommittedState;
|
||||
use super::datastore::{Locking, Result};
|
||||
use crate::db_metrics::DB_METRICS;
|
||||
use crate::error::{DatastoreError, IndexError, TableError};
|
||||
use crate::error::{DatastoreError, IndexError, TableError, ViewError};
|
||||
use crate::locking_tx_datastore::state_view::{iter_st_column_for_table, StateView};
|
||||
use crate::system_tables::{
|
||||
is_built_in_meta_row, StColumnRow, StFields as _, StTableFields, StTableRow, ST_COLUMN_ID, ST_TABLE_ID,
|
||||
is_built_in_meta_row, table_id_is_reserved, StColumnRow, StConstraintData, StConstraintRow, StFields as _,
|
||||
StIndexRow, StSequenceFields, StTableFields, StTableRow, StViewRow, ST_COLUMN_ID, ST_CONSTRAINT_ID, ST_INDEX_ID,
|
||||
ST_SEQUENCE_ID, ST_TABLE_ID, ST_VIEW_ARG_ID, ST_VIEW_ID, ST_VIEW_SUB_ID,
|
||||
};
|
||||
use anyhow::{anyhow, Context};
|
||||
use core::cell::RefMut;
|
||||
@@ -13,14 +15,15 @@ use parking_lot::RwLockWriteGuard;
|
||||
use prometheus::core::{AtomicF64, GenericGauge};
|
||||
use prometheus::IntGauge;
|
||||
use spacetimedb_commitlog::payload::txdata;
|
||||
use spacetimedb_data_structures::map::{HashSet, IntMap, IntSet};
|
||||
use spacetimedb_data_structures::map::{HashMap, HashSet, IntMap, IntSet};
|
||||
use spacetimedb_durability::History;
|
||||
use spacetimedb_durability::Txdata;
|
||||
use spacetimedb_lib::Identity;
|
||||
use spacetimedb_primitives::{ColId, ColList, TableId};
|
||||
use spacetimedb_primitives::{ColId, ColList, ColSet, SequenceId, TableId};
|
||||
use spacetimedb_sats::algebraic_value::de::ValueDeserializer;
|
||||
use spacetimedb_sats::buffer::BufReader;
|
||||
use spacetimedb_sats::{bsatn, AlgebraicValue, Deserialize, ProductValue};
|
||||
use spacetimedb_schema::def::IndexAlgorithm;
|
||||
use spacetimedb_schema::schema::{ColumnSchema, TableSchema};
|
||||
use spacetimedb_schema::table_name::TableName;
|
||||
use spacetimedb_table::indexes::RowPointer;
|
||||
@@ -80,8 +83,7 @@ pub fn apply_history(
|
||||
.set((end_tx_offset - start_tx_offset) as _);
|
||||
|
||||
log::info!("[{database_identity}] DATABASE: applied transaction history");
|
||||
drop(replay); // Neccessary to avoid a deadlock.
|
||||
datastore.rebuild_state_after_replay()?;
|
||||
replay.committed_state().rebuild_state_after_replay(datastore)?;
|
||||
log::info!("[{database_identity}] DATABASE: rebuilt state after replay");
|
||||
|
||||
Ok(())
|
||||
@@ -479,6 +481,222 @@ impl<'cs> ReplayCommittedState<'cs> {
|
||||
}
|
||||
}
|
||||
|
||||
/// The purpose of this is to rebuild the state of the datastore
|
||||
/// after having inserted all of rows from the message log.
|
||||
/// This is necessary because, for example, inserting a row into `st_table`
|
||||
/// is not equivalent to calling `create_table`.
|
||||
/// There may eventually be better way to do this, but this will have to do for now.
|
||||
pub fn rebuild_state_after_replay(&mut self, datastore: &Locking) -> Result<()> {
|
||||
// Prior versions of `RelationalDb::migrate_system_tables` (defined in the `core` crate)
|
||||
// initialized newly-created system sequences to `allocation: 4097`,
|
||||
// while `committed_state::bootstrap_system_tables` sets `allocation: 4096`.
|
||||
// This affected the system table migration which added
|
||||
// `st_view_view_id_seq` and `st_view_arg_id_seq`.
|
||||
// As a result, when replaying these databases' commitlogs without a snapshot,
|
||||
// we will end up with two rows in `st_sequence` for each of these sequences,
|
||||
// resulting in a unique constraint violation in `Self::build_indexes`.
|
||||
// We fix this by, for each system sequence, deleting all but the row with the highest allocation.
|
||||
self.fixup_delete_duplicate_system_sequence_rows();
|
||||
|
||||
// `build_missing_tables` must be called before indexes.
|
||||
// Honestly this should maybe just be one big procedure.
|
||||
// See John Carmack's philosophy on this.
|
||||
self.reschema_tables()?;
|
||||
self.build_missing_tables()?;
|
||||
self.build_indexes()?;
|
||||
self.collect_ephemeral_tables()?;
|
||||
|
||||
// Figure out where to pick up for each sequence.
|
||||
build_sequence_state(datastore, self)?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Delete all but the highest-allocation `st_sequence` row for each system sequence.
|
||||
///
|
||||
/// Prior versions of `RelationalDb::migrate_system_tables` (defined in the `core` crate)
|
||||
/// initialized newly-created system sequences to `allocation: 4097`,
|
||||
/// while [`CommittedState::bootstrap_system_tables`] sets `allocation: 4096`.
|
||||
/// This affected the system table migration which added
|
||||
/// `st_view_view_id_seq` and `st_view_arg_id_seq`.
|
||||
/// As a result, when replaying these databases' commitlogs without a snapshot,
|
||||
/// we will end up with two rows in `st_sequence` for each of these sequences,
|
||||
/// resulting in a unique constraint violation in `CommittedState::build_indexes`.
|
||||
/// We call this method in [`ReplayCommittedState::rebuild_state_after_replay`]
|
||||
/// to avoid that unique constraint violation.
|
||||
pub(super) fn fixup_delete_duplicate_system_sequence_rows(&mut self) {
|
||||
struct StSequenceRowInfo {
|
||||
sequence_id: SequenceId,
|
||||
allocated: i128,
|
||||
row_pointer: RowPointer,
|
||||
}
|
||||
|
||||
// Get all the `st_sequence` rows which refer to sequences on system tables,
|
||||
// including any duplicates caused by the bug described above.
|
||||
let sequence_rows = self
|
||||
.table_scan(ST_SEQUENCE_ID)
|
||||
.expect("`st_sequence` should exist")
|
||||
.filter_map(|row_ref| {
|
||||
// Read the table ID to which the sequence refers,
|
||||
// in order to determine if this is a system sequence or not.
|
||||
let table_id = row_ref
|
||||
.read_col::<TableId>(StSequenceFields::TableId)
|
||||
.expect("`st_sequence` row should conform to `st_sequence` schema");
|
||||
|
||||
// If this sequence refers to a system table, it may need a fixup.
|
||||
// User tables' sequences will never need fixups.
|
||||
table_id_is_reserved(table_id).then(|| {
|
||||
let allocated = row_ref
|
||||
.read_col::<i128>(StSequenceFields::Allocated)
|
||||
.expect("`st_sequence` row should conform to `st_sequence` schema");
|
||||
let sequence_id = row_ref
|
||||
.read_col::<SequenceId>(StSequenceFields::SequenceId)
|
||||
.expect("`st_sequence` row should conform to `st_sequence` schema");
|
||||
StSequenceRowInfo {
|
||||
allocated,
|
||||
sequence_id,
|
||||
row_pointer: row_ref.pointer(),
|
||||
}
|
||||
})
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
let (st_sequence, blob_store, ..) = self
|
||||
.get_table_and_blob_store_mut(ST_SEQUENCE_ID)
|
||||
.expect("`st_sequence` should exist");
|
||||
|
||||
// Track the row with the highest allocation for each sequence.
|
||||
let mut highest_allocations: HashMap<SequenceId, (i128, RowPointer)> = HashMap::default();
|
||||
|
||||
for StSequenceRowInfo {
|
||||
sequence_id,
|
||||
allocated,
|
||||
row_pointer,
|
||||
} in sequence_rows
|
||||
{
|
||||
// For each `st_sequence` row which refers to a system table,
|
||||
// if we've already seen a row for the same sequence,
|
||||
// keep only the row with the higher allocation.
|
||||
if let Some((prev_allocated, prev_row_pointer)) =
|
||||
highest_allocations.insert(sequence_id, (allocated, row_pointer))
|
||||
{
|
||||
// We have a duplicate row. We want to keep whichever has the higher `allocated`,
|
||||
// and delete the other.
|
||||
let row_pointer_to_delete = if prev_allocated > allocated {
|
||||
// The previous row has a higher allocation than the new row,
|
||||
// so delete the new row and restore `previous` to `highest_allocations`.
|
||||
highest_allocations.insert(sequence_id, (prev_allocated, prev_row_pointer));
|
||||
row_pointer
|
||||
} else {
|
||||
// The previous row does not have a higher allocation than the new,
|
||||
// so delete the previous row and keep the new one.
|
||||
prev_row_pointer
|
||||
};
|
||||
|
||||
st_sequence.delete(blob_store, row_pointer_to_delete, |_| ())
|
||||
.expect("Duplicated `st_sequence` row at `row_pointer_to_delete` should be present in `st_sequence` during fixup");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub(super) fn build_indexes(&mut self) -> Result<()> {
|
||||
let st_indexes = self.tables.get(&ST_INDEX_ID).unwrap();
|
||||
let rows = st_indexes
|
||||
.scan_rows(&self.blob_store)
|
||||
.map(StIndexRow::try_from)
|
||||
.collect::<Result<Vec<_>>>()?;
|
||||
|
||||
let st_constraints = self.tables.get(&ST_CONSTRAINT_ID).unwrap();
|
||||
let unique_constraints: HashSet<(TableId, ColSet)> = st_constraints
|
||||
.scan_rows(&self.blob_store)
|
||||
.map(StConstraintRow::try_from)
|
||||
.filter_map(Result::ok)
|
||||
.filter_map(|constraint| match constraint.constraint_data {
|
||||
StConstraintData::Unique { columns } => Some((constraint.table_id, columns)),
|
||||
_ => None,
|
||||
})
|
||||
.collect();
|
||||
|
||||
for index_row in rows {
|
||||
let index_id = index_row.index_id;
|
||||
let table_id = index_row.table_id;
|
||||
let (table, blob_store, index_id_map, _) = self
|
||||
.get_table_and_blob_store_mut(table_id)
|
||||
.expect("index should exist in committed state; cannot create it");
|
||||
let algo: IndexAlgorithm = index_row.index_algorithm.into();
|
||||
let columns: ColSet = algo.columns().into();
|
||||
let is_unique = unique_constraints.contains(&(table_id, columns));
|
||||
|
||||
let index = table.new_index(&algo, is_unique)?;
|
||||
// SAFETY: `index` was derived from `table`.
|
||||
unsafe { table.insert_index(blob_store, index_id, index) }
|
||||
.expect("rebuilding should not cause constraint violations");
|
||||
index_id_map.insert(index_id, table_id);
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub(super) fn collect_ephemeral_tables(&mut self) -> Result<()> {
|
||||
self.ephemeral_tables = self.ephemeral_tables()?.into_iter().collect();
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn ephemeral_tables(&self) -> Result<Vec<TableId>> {
|
||||
let mut tables = vec![ST_VIEW_SUB_ID, ST_VIEW_ARG_ID];
|
||||
|
||||
let Some(st_view) = self.tables.get(&ST_VIEW_ID) else {
|
||||
return Ok(tables);
|
||||
};
|
||||
let backing_tables = st_view
|
||||
.scan_rows(&self.blob_store)
|
||||
.map(|row_ref| {
|
||||
let StViewRow { table_id, view_id, .. } = StViewRow::try_from(row_ref)?;
|
||||
table_id.ok_or_else(|| DatastoreError::View(ViewError::TableNotFound(view_id)))
|
||||
})
|
||||
.collect::<Result<Vec<_>>>()?;
|
||||
|
||||
tables.extend(backing_tables);
|
||||
|
||||
Ok(tables)
|
||||
}
|
||||
|
||||
/// After replaying all old transactions,
|
||||
/// inserts and deletes into the system tables
|
||||
/// might not be reflected in the schemas of the built tables.
|
||||
/// So we must re-schema every built table.
|
||||
pub(super) fn reschema_tables(&mut self) -> Result<()> {
|
||||
// For already built tables, we need to reschema them to account for constraints et al.
|
||||
let mut schemas = Vec::with_capacity(self.tables.len());
|
||||
for table_id in self.tables.keys().copied() {
|
||||
schemas.push(self.schema_for_table_raw(table_id)?);
|
||||
}
|
||||
for (table, schema) in self.tables.values_mut().zip(schemas) {
|
||||
table.with_mut_schema(|s| *s = schema);
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// After replaying all old transactions, tables which have rows will
|
||||
/// have been created in memory, but tables with no rows will not have
|
||||
/// been created. This function ensures that they are created.
|
||||
pub(super) fn build_missing_tables(&mut self) -> Result<()> {
|
||||
// Find all ids of tables that are in `st_tables` but haven't been built.
|
||||
let table_ids = self
|
||||
.get_table(ST_TABLE_ID)
|
||||
.unwrap()
|
||||
.scan_rows(&self.blob_store)
|
||||
.map(|r| r.read_col(StTableFields::TableId).unwrap())
|
||||
.filter(|table_id| self.get_table(*table_id).is_none())
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
// Construct their schemas and insert tables for them.
|
||||
for table_id in table_ids {
|
||||
let schema = self.schema_for_table(table_id)?;
|
||||
self.create_table(table_id, schema);
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn replay_insert(&mut self, table_id: TableId, schema: &Arc<TableSchema>, row: &ProductValue) -> Result<()> {
|
||||
// Event table rows in the commitlog are preserved for future replay features
|
||||
// but don't rebuild state — event tables have no committed state.
|
||||
@@ -770,6 +988,13 @@ impl<'cs> ReplayCommittedState<'cs> {
|
||||
}
|
||||
}
|
||||
|
||||
pub(super) fn build_sequence_state(datastore: &Locking, cs: &mut CommittedState) -> Result<()> {
|
||||
let sequence_state = cs.build_sequence_state()?;
|
||||
// Reset our sequence state so that they start in the right places.
|
||||
*datastore.sequence_state.lock() = sequence_state;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
impl StateView for ReplayCommittedState<'_> {
|
||||
/// Find the `st_table` row for `table_id`,
|
||||
/// first inspecting [`Self::replay_table_updated`],
|
||||
|
||||
Reference in New Issue
Block a user