Finish refactoring out replay (#4850)

# Description of Changes

Move the rest of replay logic to `mod replay`.

Closes #4055.

# API and ABI breaking changes

None

# Expected complexity level and risk

2

# Testing

Just code motion.
This commit is contained in:
Mazdak Farrokhzad
2026-04-23 12:49:20 +02:00
committed by GitHub
parent 7c51914cd5
commit a32cffa769
3 changed files with 241 additions and 244 deletions
@@ -8,12 +8,11 @@ use super::{
};
use crate::{
db_metrics::DB_METRICS,
error::{DatastoreError, TableError, ViewError},
error::TableError,
execution_context::ExecutionContext,
locking_tx_datastore::{mut_tx::ViewReadSets, state_view::ScanOrIndex, IterByColRangeTx},
system_tables::{
system_tables, table_id_is_reserved, StColumnRow, StConstraintData, StConstraintRow, StIndexRow,
StSequenceFields, StSequenceRow, StTableFields, StTableRow, StViewRow, SystemTable, ST_CLIENT_ID,
system_tables, StColumnRow, StConstraintRow, StIndexRow, StSequenceRow, StTableRow, SystemTable, ST_CLIENT_ID,
ST_CLIENT_IDX, ST_COLUMN_ID, ST_COLUMN_IDX, ST_COLUMN_NAME, ST_CONSTRAINT_ID, ST_CONSTRAINT_IDX,
ST_CONSTRAINT_NAME, ST_INDEX_ID, ST_INDEX_IDX, ST_INDEX_NAME, ST_MODULE_ID, ST_MODULE_IDX,
ST_ROW_LEVEL_SECURITY_ID, ST_ROW_LEVEL_SECURITY_IDX, ST_SCHEDULED_ID, ST_SCHEDULED_IDX, ST_SEQUENCE_ID,
@@ -33,13 +32,13 @@ use crate::{
};
use anyhow::anyhow;
use core::{convert::Infallible, ops::RangeBounds};
use spacetimedb_data_structures::map::{HashMap, HashSet, IntMap, IntSet};
use spacetimedb_data_structures::map::{IntMap, IntSet};
use spacetimedb_durability::TxOffset;
use spacetimedb_lib::{db::auth::StTableType, Identity};
use spacetimedb_primitives::{ColList, ColSet, IndexId, SequenceId, TableId, ViewId};
use spacetimedb_primitives::{ColList, IndexId, TableId, ViewId};
use spacetimedb_sats::memory_usage::MemoryUsage;
use spacetimedb_sats::{AlgebraicValue, ProductValue};
use spacetimedb_schema::{def::IndexAlgorithm, schema::TableSchema};
use spacetimedb_schema::schema::TableSchema;
use spacetimedb_table::{
blob_store::{BlobStore, HashMapBlobStore},
indexes::{RowPointer, SquashedOffset},
@@ -200,93 +199,6 @@ impl CommittedState {
}
}
/// Delete all but the highest-allocation `st_sequence` row for each system sequence.
///
/// Prior versions of `RelationalDb::migrate_system_tables` (defined in the `core` crate)
/// initialized newly-created system sequences to `allocation: 4097`,
/// while `committed_state::bootstrap_system_tables` sets `allocation: 4096`.
/// This affected the system table migration which added
/// `st_view_view_id_seq` and `st_view_arg_id_seq`.
/// As a result, when replaying these databases' commitlogs without a snapshot,
/// we will end up with two rows in `st_sequence` for each of these sequences,
/// resulting in a unique constraint violation in `CommittedState::build_indexes`.
/// We call this method in [`super::datastore::Locking::rebuild_state_after_replay`]
/// to avoid that unique constraint violation.
pub(super) fn fixup_delete_duplicate_system_sequence_rows(&mut self) {
struct StSequenceRowInfo {
sequence_id: SequenceId,
allocated: i128,
row_pointer: RowPointer,
}
// Get all the `st_sequence` rows which refer to sequences on system tables,
// including any duplicates caused by the bug described above.
let sequence_rows = self
.table_scan(ST_SEQUENCE_ID)
.expect("`st_sequence` should exist")
.filter_map(|row_ref| {
// Read the table ID to which the sequence refers,
// in order to determine if this is a system sequence or not.
let table_id = row_ref
.read_col::<TableId>(StSequenceFields::TableId)
.expect("`st_sequence` row should conform to `st_sequence` schema");
// If this sequence refers to a system table, it may need a fixup.
// User tables' sequences will never need fixups.
table_id_is_reserved(table_id).then(|| {
let allocated = row_ref
.read_col::<i128>(StSequenceFields::Allocated)
.expect("`st_sequence` row should conform to `st_sequence` schema");
let sequence_id = row_ref
.read_col::<SequenceId>(StSequenceFields::SequenceId)
.expect("`st_sequence` row should conform to `st_sequence` schema");
StSequenceRowInfo {
allocated,
sequence_id,
row_pointer: row_ref.pointer(),
}
})
})
.collect::<Vec<_>>();
let (st_sequence, blob_store, ..) = self
.get_table_and_blob_store_mut(ST_SEQUENCE_ID)
.expect("`st_sequence` should exist");
// Track the row with the highest allocation for each sequence.
let mut highest_allocations: HashMap<SequenceId, (i128, RowPointer)> = HashMap::default();
for StSequenceRowInfo {
sequence_id,
allocated,
row_pointer,
} in sequence_rows
{
// For each `st_sequence` row which refers to a system table,
// if we've already seen a row for the same sequence,
// keep only the row with the higher allocation.
if let Some((prev_allocated, prev_row_pointer)) =
highest_allocations.insert(sequence_id, (allocated, row_pointer))
{
// We have a duplicate row. We want to keep whichever has the higher `allocated`,
// and delete the other.
let row_pointer_to_delete = if prev_allocated > allocated {
// The previous row has a higher allocation than the new row,
// so delete the new row and restore `previous` to `highest_allocations`.
highest_allocations.insert(sequence_id, (prev_allocated, prev_row_pointer));
row_pointer
} else {
// The previous row does not have a higher allocation than the new,
// so delete the previous row and keep the new one.
prev_row_pointer
};
st_sequence.delete(blob_store, row_pointer_to_delete, |_| ())
.expect("Duplicated `st_sequence` row at `row_pointer_to_delete` should be present in `st_sequence` during fixup");
}
}
}
/// Extremely delicate function to bootstrap the system tables.
/// Don't update this unless you know what you're doing.
pub(super) fn bootstrap_system_tables(&mut self, database_identity: Identity) -> Result<()> {
@@ -484,106 +396,6 @@ impl CommittedState {
Ok(sequence_state)
}
pub(super) fn build_indexes(&mut self) -> Result<()> {
let st_indexes = self.tables.get(&ST_INDEX_ID).unwrap();
let rows = st_indexes
.scan_rows(&self.blob_store)
.map(StIndexRow::try_from)
.collect::<Result<Vec<_>>>()?;
let st_constraints = self.tables.get(&ST_CONSTRAINT_ID).unwrap();
let unique_constraints: HashSet<(TableId, ColSet)> = st_constraints
.scan_rows(&self.blob_store)
.map(StConstraintRow::try_from)
.filter_map(Result::ok)
.filter_map(|constraint| match constraint.constraint_data {
StConstraintData::Unique { columns } => Some((constraint.table_id, columns)),
_ => None,
})
.collect();
for index_row in rows {
let index_id = index_row.index_id;
let table_id = index_row.table_id;
let (table, blob_store, index_id_map, _) = self
.get_table_and_blob_store_mut(table_id)
.expect("index should exist in committed state; cannot create it");
let algo: IndexAlgorithm = index_row.index_algorithm.into();
let columns: ColSet = algo.columns().into();
let is_unique = unique_constraints.contains(&(table_id, columns));
let index = table.new_index(&algo, is_unique)?;
// SAFETY: `index` was derived from `table`.
unsafe { table.insert_index(blob_store, index_id, index) }
.expect("rebuilding should not cause constraint violations");
index_id_map.insert(index_id, table_id);
}
Ok(())
}
pub(super) fn collect_ephemeral_tables(&mut self) -> Result<()> {
self.ephemeral_tables = self.ephemeral_tables()?.into_iter().collect();
Ok(())
}
fn ephemeral_tables(&self) -> Result<Vec<TableId>> {
let mut tables = vec![ST_VIEW_SUB_ID, ST_VIEW_ARG_ID];
let Some(st_view) = self.tables.get(&ST_VIEW_ID) else {
return Ok(tables);
};
let backing_tables = st_view
.scan_rows(&self.blob_store)
.map(|row_ref| {
let view_row = StViewRow::try_from(row_ref)?;
view_row
.table_id
.ok_or_else(|| DatastoreError::View(ViewError::TableNotFound(view_row.view_id)))
})
.collect::<Result<Vec<_>>>()?;
tables.extend(backing_tables);
Ok(tables)
}
/// After replaying all old transactions,
/// inserts and deletes into the system tables
/// might not be reflected in the schemas of the built tables.
/// So we must re-schema every built table.
pub(super) fn reschema_tables(&mut self) -> Result<()> {
// For already built tables, we need to reschema them to account for constraints et al.
let mut schemas = Vec::with_capacity(self.tables.len());
for table_id in self.tables.keys().copied() {
schemas.push(self.schema_for_table_raw(table_id)?);
}
for (table, schema) in self.tables.values_mut().zip(schemas) {
table.with_mut_schema(|s| *s = schema);
}
Ok(())
}
/// After replaying all old transactions, tables which have rows will
/// have been created in memory, but tables with no rows will not have
/// been created. This function ensures that they are created.
pub(super) fn build_missing_tables(&mut self) -> Result<()> {
// Find all ids of tables that are in `st_tables` but haven't been built.
let table_ids = self
.get_table(ST_TABLE_ID)
.unwrap()
.scan_rows(&self.blob_store)
.map(|r| r.read_col(StTableFields::TableId).unwrap())
.filter(|table_id| self.get_table(*table_id).is_none())
.collect::<Vec<_>>();
// Construct their schemas and insert tables for them.
for table_id in table_ids {
let schema = self.schema_for_table(table_id)?;
self.create_table(table_id, schema);
}
Ok(())
}
/// Returns an iterator doing a full table scan on `table_id`.
pub(super) fn table_scan<'a>(&'a self, table_id: TableId) -> Option<TableScanIter<'a>> {
Some(self.get_table(table_id)?.scan_rows(&self.blob_store))
@@ -1008,7 +820,7 @@ impl CommittedState {
Table::new(schema, SquashedOffset::COMMITTED_STATE)
}
fn create_table(&mut self, table_id: TableId, schema: Arc<TableSchema>) {
pub(super) fn create_table(&mut self, table_id: TableId, schema: Arc<TableSchema>) {
self.tables.insert(table_id, Self::make_table(schema));
}
@@ -3,7 +3,7 @@ use super::{
tx_state::TxState,
};
use crate::execution_context::{Workload, WorkloadType};
use crate::locking_tx_datastore::replay::{ErrorBehavior, Replay};
use crate::locking_tx_datastore::replay::{build_sequence_state, ErrorBehavior, Replay};
use crate::{
db_metrics::DB_METRICS,
error::{DatastoreError, TableError},
@@ -68,7 +68,7 @@ pub struct Locking {
// made private again.
pub committed_state: Arc<RwLock<CommittedState>>,
/// The state of sequence generation in this database.
sequence_state: Arc<Mutex<SequencesState>>,
pub(super) sequence_state: Arc<Mutex<SequencesState>>,
/// The identity of this database.
pub(crate) database_identity: Identity,
}
@@ -117,11 +117,7 @@ impl Locking {
commit_state.bootstrap_system_tables(database_identity)?;
// The database tables are now initialized with the correct data.
// Now we have to build our in memory structures.
{
let sequence_state = commit_state.build_sequence_state()?;
// Reset our sequence state so that they start in the right places.
*datastore.sequence_state.lock() = sequence_state;
}
build_sequence_state(&datastore, &mut commit_state)?;
// We don't want to build indexes here; we'll build those later,
// in `rebuild_state_after_replay`.
@@ -132,38 +128,6 @@ impl Locking {
Ok(datastore)
}
/// The purpose of this is to rebuild the state of the datastore
/// after having inserted all of rows from the message log.
/// This is necessary because, for example, inserting a row into `st_table`
/// is not equivalent to calling `create_table`.
/// There may eventually be better way to do this, but this will have to do for now.
pub fn rebuild_state_after_replay(&self) -> Result<()> {
let mut committed_state = self.committed_state.write_arc();
// Prior versions of `RelationalDb::migrate_system_tables` (defined in the `core` crate)
// initialized newly-created system sequences to `allocation: 4097`,
// while `committed_state::bootstrap_system_tables` sets `allocation: 4096`.
// This affected the system table migration which added
// `st_view_view_id_seq` and `st_view_arg_id_seq`.
// As a result, when replaying these databases' commitlogs without a snapshot,
// we will end up with two rows in `st_sequence` for each of these sequences,
// resulting in a unique constraint violation in `CommittedState::build_indexes`.
// We fix this by, for each system sequence, deleting all but the row with the highest allocation.
committed_state.fixup_delete_duplicate_system_sequence_rows();
// `build_missing_tables` must be called before indexes.
// Honestly this should maybe just be one big procedure.
// See John Carmack's philosophy on this.
committed_state.reschema_tables()?;
committed_state.build_missing_tables()?;
committed_state.build_indexes()?;
// Figure out where to pick up for each sequence.
*self.sequence_state.lock() = committed_state.build_sequence_state()?;
committed_state.collect_ephemeral_tables()?;
Ok(())
}
/// Obtain a [`spacetimedb_commitlog::Decoder`] suitable for replaying a
/// [`spacetimedb_durability::History`] onto the currently committed state.
///
@@ -242,11 +206,7 @@ impl Locking {
// Set the sequence state. In practice we will end up doing this again after replaying
// the commit log, but we do it here too just to avoid having an incorrectly restored
// snapshot.
{
let sequence_state = committed_state.build_sequence_state()?;
// Reset our sequence state so that they start in the right places.
*datastore.sequence_state.lock() = sequence_state;
}
build_sequence_state(&datastore, &mut committed_state)?;
// The next TX offset after restoring from a snapshot is one greater than the snapshotted offset.
committed_state.next_tx_offset = tx_offset + 1;
@@ -1,10 +1,12 @@
use super::committed_state::CommittedState;
use super::datastore::{Locking, Result};
use crate::db_metrics::DB_METRICS;
use crate::error::{DatastoreError, IndexError, TableError};
use crate::error::{DatastoreError, IndexError, TableError, ViewError};
use crate::locking_tx_datastore::state_view::{iter_st_column_for_table, StateView};
use crate::system_tables::{
is_built_in_meta_row, StColumnRow, StFields as _, StTableFields, StTableRow, ST_COLUMN_ID, ST_TABLE_ID,
is_built_in_meta_row, table_id_is_reserved, StColumnRow, StConstraintData, StConstraintRow, StFields as _,
StIndexRow, StSequenceFields, StTableFields, StTableRow, StViewRow, ST_COLUMN_ID, ST_CONSTRAINT_ID, ST_INDEX_ID,
ST_SEQUENCE_ID, ST_TABLE_ID, ST_VIEW_ARG_ID, ST_VIEW_ID, ST_VIEW_SUB_ID,
};
use anyhow::{anyhow, Context};
use core::cell::RefMut;
@@ -13,14 +15,15 @@ use parking_lot::RwLockWriteGuard;
use prometheus::core::{AtomicF64, GenericGauge};
use prometheus::IntGauge;
use spacetimedb_commitlog::payload::txdata;
use spacetimedb_data_structures::map::{HashSet, IntMap, IntSet};
use spacetimedb_data_structures::map::{HashMap, HashSet, IntMap, IntSet};
use spacetimedb_durability::History;
use spacetimedb_durability::Txdata;
use spacetimedb_lib::Identity;
use spacetimedb_primitives::{ColId, ColList, TableId};
use spacetimedb_primitives::{ColId, ColList, ColSet, SequenceId, TableId};
use spacetimedb_sats::algebraic_value::de::ValueDeserializer;
use spacetimedb_sats::buffer::BufReader;
use spacetimedb_sats::{bsatn, AlgebraicValue, Deserialize, ProductValue};
use spacetimedb_schema::def::IndexAlgorithm;
use spacetimedb_schema::schema::{ColumnSchema, TableSchema};
use spacetimedb_schema::table_name::TableName;
use spacetimedb_table::indexes::RowPointer;
@@ -80,8 +83,7 @@ pub fn apply_history(
.set((end_tx_offset - start_tx_offset) as _);
log::info!("[{database_identity}] DATABASE: applied transaction history");
drop(replay); // Neccessary to avoid a deadlock.
datastore.rebuild_state_after_replay()?;
replay.committed_state().rebuild_state_after_replay(datastore)?;
log::info!("[{database_identity}] DATABASE: rebuilt state after replay");
Ok(())
@@ -479,6 +481,222 @@ impl<'cs> ReplayCommittedState<'cs> {
}
}
/// The purpose of this is to rebuild the state of the datastore
/// after having inserted all of rows from the message log.
/// This is necessary because, for example, inserting a row into `st_table`
/// is not equivalent to calling `create_table`.
/// There may eventually be better way to do this, but this will have to do for now.
pub fn rebuild_state_after_replay(&mut self, datastore: &Locking) -> Result<()> {
// Prior versions of `RelationalDb::migrate_system_tables` (defined in the `core` crate)
// initialized newly-created system sequences to `allocation: 4097`,
// while `committed_state::bootstrap_system_tables` sets `allocation: 4096`.
// This affected the system table migration which added
// `st_view_view_id_seq` and `st_view_arg_id_seq`.
// As a result, when replaying these databases' commitlogs without a snapshot,
// we will end up with two rows in `st_sequence` for each of these sequences,
// resulting in a unique constraint violation in `Self::build_indexes`.
// We fix this by, for each system sequence, deleting all but the row with the highest allocation.
self.fixup_delete_duplicate_system_sequence_rows();
// `build_missing_tables` must be called before indexes.
// Honestly this should maybe just be one big procedure.
// See John Carmack's philosophy on this.
self.reschema_tables()?;
self.build_missing_tables()?;
self.build_indexes()?;
self.collect_ephemeral_tables()?;
// Figure out where to pick up for each sequence.
build_sequence_state(datastore, self)?;
Ok(())
}
/// Delete all but the highest-allocation `st_sequence` row for each system sequence.
///
/// Prior versions of `RelationalDb::migrate_system_tables` (defined in the `core` crate)
/// initialized newly-created system sequences to `allocation: 4097`,
/// while [`CommittedState::bootstrap_system_tables`] sets `allocation: 4096`.
/// This affected the system table migration which added
/// `st_view_view_id_seq` and `st_view_arg_id_seq`.
/// As a result, when replaying these databases' commitlogs without a snapshot,
/// we will end up with two rows in `st_sequence` for each of these sequences,
/// resulting in a unique constraint violation in `CommittedState::build_indexes`.
/// We call this method in [`ReplayCommittedState::rebuild_state_after_replay`]
/// to avoid that unique constraint violation.
pub(super) fn fixup_delete_duplicate_system_sequence_rows(&mut self) {
struct StSequenceRowInfo {
sequence_id: SequenceId,
allocated: i128,
row_pointer: RowPointer,
}
// Get all the `st_sequence` rows which refer to sequences on system tables,
// including any duplicates caused by the bug described above.
let sequence_rows = self
.table_scan(ST_SEQUENCE_ID)
.expect("`st_sequence` should exist")
.filter_map(|row_ref| {
// Read the table ID to which the sequence refers,
// in order to determine if this is a system sequence or not.
let table_id = row_ref
.read_col::<TableId>(StSequenceFields::TableId)
.expect("`st_sequence` row should conform to `st_sequence` schema");
// If this sequence refers to a system table, it may need a fixup.
// User tables' sequences will never need fixups.
table_id_is_reserved(table_id).then(|| {
let allocated = row_ref
.read_col::<i128>(StSequenceFields::Allocated)
.expect("`st_sequence` row should conform to `st_sequence` schema");
let sequence_id = row_ref
.read_col::<SequenceId>(StSequenceFields::SequenceId)
.expect("`st_sequence` row should conform to `st_sequence` schema");
StSequenceRowInfo {
allocated,
sequence_id,
row_pointer: row_ref.pointer(),
}
})
})
.collect::<Vec<_>>();
let (st_sequence, blob_store, ..) = self
.get_table_and_blob_store_mut(ST_SEQUENCE_ID)
.expect("`st_sequence` should exist");
// Track the row with the highest allocation for each sequence.
let mut highest_allocations: HashMap<SequenceId, (i128, RowPointer)> = HashMap::default();
for StSequenceRowInfo {
sequence_id,
allocated,
row_pointer,
} in sequence_rows
{
// For each `st_sequence` row which refers to a system table,
// if we've already seen a row for the same sequence,
// keep only the row with the higher allocation.
if let Some((prev_allocated, prev_row_pointer)) =
highest_allocations.insert(sequence_id, (allocated, row_pointer))
{
// We have a duplicate row. We want to keep whichever has the higher `allocated`,
// and delete the other.
let row_pointer_to_delete = if prev_allocated > allocated {
// The previous row has a higher allocation than the new row,
// so delete the new row and restore `previous` to `highest_allocations`.
highest_allocations.insert(sequence_id, (prev_allocated, prev_row_pointer));
row_pointer
} else {
// The previous row does not have a higher allocation than the new,
// so delete the previous row and keep the new one.
prev_row_pointer
};
st_sequence.delete(blob_store, row_pointer_to_delete, |_| ())
.expect("Duplicated `st_sequence` row at `row_pointer_to_delete` should be present in `st_sequence` during fixup");
}
}
}
pub(super) fn build_indexes(&mut self) -> Result<()> {
let st_indexes = self.tables.get(&ST_INDEX_ID).unwrap();
let rows = st_indexes
.scan_rows(&self.blob_store)
.map(StIndexRow::try_from)
.collect::<Result<Vec<_>>>()?;
let st_constraints = self.tables.get(&ST_CONSTRAINT_ID).unwrap();
let unique_constraints: HashSet<(TableId, ColSet)> = st_constraints
.scan_rows(&self.blob_store)
.map(StConstraintRow::try_from)
.filter_map(Result::ok)
.filter_map(|constraint| match constraint.constraint_data {
StConstraintData::Unique { columns } => Some((constraint.table_id, columns)),
_ => None,
})
.collect();
for index_row in rows {
let index_id = index_row.index_id;
let table_id = index_row.table_id;
let (table, blob_store, index_id_map, _) = self
.get_table_and_blob_store_mut(table_id)
.expect("index should exist in committed state; cannot create it");
let algo: IndexAlgorithm = index_row.index_algorithm.into();
let columns: ColSet = algo.columns().into();
let is_unique = unique_constraints.contains(&(table_id, columns));
let index = table.new_index(&algo, is_unique)?;
// SAFETY: `index` was derived from `table`.
unsafe { table.insert_index(blob_store, index_id, index) }
.expect("rebuilding should not cause constraint violations");
index_id_map.insert(index_id, table_id);
}
Ok(())
}
pub(super) fn collect_ephemeral_tables(&mut self) -> Result<()> {
self.ephemeral_tables = self.ephemeral_tables()?.into_iter().collect();
Ok(())
}
fn ephemeral_tables(&self) -> Result<Vec<TableId>> {
let mut tables = vec![ST_VIEW_SUB_ID, ST_VIEW_ARG_ID];
let Some(st_view) = self.tables.get(&ST_VIEW_ID) else {
return Ok(tables);
};
let backing_tables = st_view
.scan_rows(&self.blob_store)
.map(|row_ref| {
let StViewRow { table_id, view_id, .. } = StViewRow::try_from(row_ref)?;
table_id.ok_or_else(|| DatastoreError::View(ViewError::TableNotFound(view_id)))
})
.collect::<Result<Vec<_>>>()?;
tables.extend(backing_tables);
Ok(tables)
}
/// After replaying all old transactions,
/// inserts and deletes into the system tables
/// might not be reflected in the schemas of the built tables.
/// So we must re-schema every built table.
pub(super) fn reschema_tables(&mut self) -> Result<()> {
// For already built tables, we need to reschema them to account for constraints et al.
let mut schemas = Vec::with_capacity(self.tables.len());
for table_id in self.tables.keys().copied() {
schemas.push(self.schema_for_table_raw(table_id)?);
}
for (table, schema) in self.tables.values_mut().zip(schemas) {
table.with_mut_schema(|s| *s = schema);
}
Ok(())
}
/// After replaying all old transactions, tables which have rows will
/// have been created in memory, but tables with no rows will not have
/// been created. This function ensures that they are created.
pub(super) fn build_missing_tables(&mut self) -> Result<()> {
// Find all ids of tables that are in `st_tables` but haven't been built.
let table_ids = self
.get_table(ST_TABLE_ID)
.unwrap()
.scan_rows(&self.blob_store)
.map(|r| r.read_col(StTableFields::TableId).unwrap())
.filter(|table_id| self.get_table(*table_id).is_none())
.collect::<Vec<_>>();
// Construct their schemas and insert tables for them.
for table_id in table_ids {
let schema = self.schema_for_table(table_id)?;
self.create_table(table_id, schema);
}
Ok(())
}
fn replay_insert(&mut self, table_id: TableId, schema: &Arc<TableSchema>, row: &ProductValue) -> Result<()> {
// Event table rows in the commitlog are preserved for future replay features
// but don't rebuild state — event tables have no committed state.
@@ -770,6 +988,13 @@ impl<'cs> ReplayCommittedState<'cs> {
}
}
pub(super) fn build_sequence_state(datastore: &Locking, cs: &mut CommittedState) -> Result<()> {
let sequence_state = cs.build_sequence_state()?;
// Reset our sequence state so that they start in the right places.
*datastore.sequence_state.lock() = sequence_state;
Ok(())
}
impl StateView for ReplayCommittedState<'_> {
/// Find the `st_table` row for `table_id`,
/// first inspecting [`Self::replay_table_updated`],