datastore: apply schema changes immediately to committed state.

This commit is contained in:
Mazdak Farrokhzad
2025-04-17 21:48:08 +02:00
parent 05e171ccec
commit 0af71cf7f3
15 changed files with 1170 additions and 1165 deletions
@@ -2,14 +2,10 @@ use super::{
datastore::Result,
delete_table::DeleteTable,
sequence::{Sequence, SequencesState},
state_view::{IterByColRangeTx, StateView},
tx_state::{IndexIdMap, RemovedIndexIdSet, TxState},
state_view::{IterByColRangeTx, IterTx, ScanIterByColRangeTx, StateView},
tx_state::{IndexIdMap, PendingSchemaChange, TxState},
IterByColEqTx,
};
use crate::{
db::datastore::locking_tx_datastore::state_view::{IterTx, ScanIterByColRangeTx},
error::IndexError,
};
use crate::{
db::{
datastore::{
@@ -25,7 +21,7 @@ use crate::{
},
db_metrics::DB_METRICS,
},
error::TableError,
error::{IndexError, TableError},
execution_context::ExecutionContext,
};
use anyhow::anyhow;
@@ -116,14 +112,14 @@ impl StateView for CommittedState {
cols: ColList,
range: R,
) -> Result<Self::IterByColRange<'_, R>> {
// TODO: Why does this unconditionally return a `Scan` iter,
// instead of trying to return a `CommittedIndex` iter?
// Answer: Because CommittedIndexIter::tx_state: Option<&'a TxState> need to be Some to read after reopen
Ok(IterByColRangeTx::Scan(ScanIterByColRangeTx::new(
self.iter(table_id)?,
cols,
range,
)))
match self.index_seek(table_id, &cols, &range) {
Some(iter) => Ok(IterByColRangeTx::Index(iter)),
None => Ok(IterByColRangeTx::Scan(ScanIterByColRangeTx::new(
self.iter(table_id)?,
cols,
range,
))),
}
}
fn iter_by_col_eq<'a, 'r>(
@@ -386,7 +382,7 @@ impl CommittedState {
// but before creating any user tables.
// The `sequence` we read out of `row_ref` above, and used to construct `seq`,
// will correctly reflect the state after creating user tables.
sequence_state.insert(seq.id(), seq);
sequence_state.insert(seq);
}
Ok(())
}
@@ -412,7 +408,7 @@ impl CommittedState {
for index_row in rows {
let index_id = index_row.index_id;
let table_id = index_row.table_id;
let Some((table, blob_store)) = self.get_table_and_blob_store(table_id) else {
let (Some(table), blob_store, index_id_map) = self.get_table_and_blob_store_mut(table_id) else {
panic!("Cannot create index for table which doesn't exist in committed state");
};
let algo: IndexAlgorithm = index_row.index_algorithm.into();
@@ -422,7 +418,7 @@ impl CommittedState {
let index = table.new_index(&algo, is_unique)?;
// SAFETY: `index` was derived from `table`.
unsafe { table.insert_index(blob_store, index_id, index) };
self.index_id_map.insert(index_id, table_id);
index_id_map.insert(index_id, table_id);
}
Ok(())
}
@@ -547,9 +543,6 @@ impl CommittedState {
pub(super) fn merge(&mut self, tx_state: TxState, ctx: &ExecutionContext) -> TxData {
let mut tx_data = TxData::default();
// First, merge index id fast-lookup map changes and delete indices.
self.merge_index_map(tx_state.index_id_map, tx_state.index_id_map_removals.as_deref());
// First, apply deletes. This will free up space in the committed tables.
self.merge_apply_deletes(&mut tx_data, tx_state.delete_tables);
@@ -569,7 +562,7 @@ impl CommittedState {
fn merge_apply_deletes(&mut self, tx_data: &mut TxData, delete_tables: BTreeMap<TableId, DeleteTable>) {
for (table_id, row_ptrs) in delete_tables {
if let Some((table, blob_store)) = self.get_table_and_blob_store(table_id) {
if let (Some(table), blob_store, _) = self.get_table_and_blob_store_mut(table_id) {
let mut deletes = Vec::with_capacity(row_ptrs.len());
// Note: we maintain the invariant that the delete_tables
@@ -631,19 +624,7 @@ impl CommittedState {
tx_data.set_inserts_for_table(table_id, table_name, inserts.into());
}
let (schema, indexes, pages) = tx_table.consume_for_merge();
// Add all newly created indexes to the committed state.
for (index_id, mut index) in indexes {
if !commit_table.indexes.contains_key(&index_id) {
index.clear();
// SAFETY: `tx_table` is derived from `commit_table`,
// so they have the same row type.
// This entails that all indices in `tx_table`
// were constructed with the same row type/layout as `commit_table`.
unsafe { commit_table.insert_index(commit_blob_store, index_id, index) };
}
}
let (schema, _indexes, pages) = tx_table.consume_for_merge();
// The schema may have been modified in the transaction.
// Update this last to placate borrowck and avoid a clone.
@@ -655,43 +636,104 @@ impl CommittedState {
}
}
fn merge_index_map(&mut self, index_id_map: IndexIdMap, index_id_map_removals: Option<&RemovedIndexIdSet>) {
// Remove indices that tx-state removed.
// It's not necessarily the case that the index already existed in the committed state.
for (index_id, table_id) in index_id_map_removals
.into_iter()
.flatten()
.filter_map(|index_id| self.index_id_map.remove(index_id).map(|x| (*index_id, x)))
{
assert!(self
.tables
.get_mut(&table_id)
.expect("table to delete index from should exist")
.delete_index(&self.blob_store, index_id));
/// Rolls back the changes immediately made to the committed state during a transaction.
pub(super) fn rollback(&mut self, seq_state: &mut SequencesState, tx_state: TxState) {
// Roll back the changes in the reverse order in which they were made
// so that e.g., the last change is undone first.
for change in tx_state.pending_schema_changes.into_iter().rev() {
self.rollback_pending_schema_change(seq_state, change);
}
}
fn rollback_pending_schema_change(
&mut self,
seq_state: &mut SequencesState,
change: PendingSchemaChange,
) -> Option<()> {
use PendingSchemaChange::*;
match change {
// An index was removed. Add it back.
IndexRemoved(table_id, index_id, table_index, index_schema) => {
let table = self.tables.get_mut(&table_id)?;
// SAFETY: `table_index` was derived from `table`.
unsafe { table.add_index(index_id, table_index) };
table.with_mut_schema(|s| s.update_index(index_schema));
self.index_id_map.insert(index_id, table_id);
}
// An index was added. Remove it.
IndexAdded(table_id, index_id, pointer_map) => {
let table = self.tables.get_mut(&table_id)?;
table.delete_index(&self.blob_store, index_id, pointer_map);
table.with_mut_schema(|s| s.remove_index(index_id));
self.index_id_map.remove(&index_id);
}
// A table was removed. Add it back.
TableRemoved(table_id, table) => {
// We don't need to deal with sub-components.
// That is, we don't need to add back indices and such.
// Instead, there will be separate pending schema changes like `IndexRemoved`.
self.tables.insert(table_id, table);
}
// A table was added. Remove it.
TableAdded(table_id) => {
// We don't need to deal with sub-components.
// That is, we don't need to remove indices and such.
// Instead, there will be separate pending schema changes like `IndexAdded`.
self.tables.remove(&table_id);
}
// A table's access was changed. Change back to the old one.
TableAlterAccess(table_id, access) => {
let table = self.tables.get_mut(&table_id)?;
table.with_mut_schema(|s| s.table_access = access);
}
// A constraint was removed. Add it back.
ConstraintRemoved(table_id, constraint_schema) => {
let table = self.tables.get_mut(&table_id)?;
table.with_mut_schema(|s| s.update_constraint(constraint_schema));
}
// A constraint was added. Remove it.
ConstraintAdded(table_id, constraint_id) => {
let table = self.tables.get_mut(&table_id)?;
table.with_mut_schema(|s| s.remove_constraint(constraint_id));
}
// A sequence was removed. Add it back.
SequenceRemoved(table_id, seq, schema) => {
let table = self.tables.get_mut(&table_id)?;
table.with_mut_schema(|s| s.update_sequence(schema));
seq_state.insert(seq);
}
// A sequence was added. Remove it.
SequenceAdded(table_id, sequence_id) => {
let table = self.tables.get_mut(&table_id)?;
table.with_mut_schema(|s| s.remove_sequence(sequence_id));
seq_state.remove(sequence_id);
}
}
// Add the ones tx-state added.
self.index_id_map.extend(index_id_map);
Some(())
}
pub(super) fn get_table(&self, table_id: TableId) -> Option<&Table> {
self.tables.get(&table_id)
}
pub(super) fn get_table_mut(&mut self, table_id: TableId) -> (Option<&mut Table>, &PagePool) {
(self.tables.get_mut(&table_id), &self.page_pool)
#[allow(clippy::unnecessary_lazy_evaluations)]
pub fn get_table_and_blob_store(&self, table_id: TableId) -> Result<CommitTableForInsertion<'_>> {
let table = self
.get_table(table_id)
.ok_or_else(|| TableError::IdNotFoundState(table_id))?;
Ok((table, &self.blob_store as &dyn BlobStore, &self.index_id_map))
}
pub fn get_table_and_blob_store_immutable(&self, table_id: TableId) -> Option<(&Table, &dyn BlobStore)> {
self.tables
.get(&table_id)
.map(|tbl| (tbl, &self.blob_store as &dyn BlobStore))
}
pub(super) fn get_table_and_blob_store(&mut self, table_id: TableId) -> Option<(&mut Table, &mut dyn BlobStore)> {
self.tables
.get_mut(&table_id)
.map(|tbl| (tbl, &mut self.blob_store as &mut dyn BlobStore))
pub(super) fn get_table_and_blob_store_mut(
&mut self,
table_id: TableId,
) -> (Option<&mut Table>, &mut dyn BlobStore, &mut IndexIdMap) {
(
self.tables.get_mut(&table_id),
&mut self.blob_store as &mut dyn BlobStore,
&mut self.index_id_map,
)
}
fn make_table(schema: Arc<TableSchema>) -> Table {
@@ -712,7 +754,7 @@ impl CommittedState {
.entry(table_id)
.or_insert_with(|| Self::make_table(schema.clone()));
let blob_store = &mut self.blob_store;
let pool = &mut self.page_pool;
let pool = &self.page_pool;
(table, blob_store, pool)
}
@@ -750,25 +792,4 @@ impl CommittedState {
}
}
pub struct CommittedIndexIterWithDeletedMutTx<'a> {
committed_rows: IndexScanRangeIter<'a>,
del_table: &'a DeleteTable,
}
impl<'a> CommittedIndexIterWithDeletedMutTx<'a> {
pub(super) fn new(committed_rows: IndexScanRangeIter<'a>, del_table: &'a DeleteTable) -> Self {
Self {
committed_rows,
del_table,
}
}
}
impl<'a> Iterator for CommittedIndexIterWithDeletedMutTx<'a> {
type Item = RowRef<'a>;
fn next(&mut self) -> Option<Self::Item> {
self.committed_rows
.find(|row_ref| !self.del_table.contains(row_ref.pointer()))
}
}
pub(super) type CommitTableForInsertion<'a> = (&'a Table, &'a dyn BlobStore, &'a IndexIdMap);
@@ -638,7 +638,8 @@ impl MutTxDatastore for Locking {
index_id: IndexId,
row: &[u8],
) -> Result<(ColList, RowRef<'a>, UpdateFlags)> {
tx.update(table_id, index_id, row)
let (gens, row_ref, update_flags) = tx.update(table_id, index_id, row)?;
Ok((gens, row_ref.collapse(), update_flags))
}
fn metadata_mut_tx(&self, tx: &Self::MutTx) -> Result<Option<Metadata>> {
@@ -2358,7 +2359,7 @@ mod tests {
// the delete should first mark the committed row as deleted in the delete tables,
// and then it should remove it from the delete tables upon insertion,
// rather than actually inserting it in the tx state.
// So the second transaction should be observationally a no-op.s
// So the second transaction should be observationally a no-op.
// There was a bug in the datastore that did not respect this in the presence of a unique index.
let (deleted_1, tx_data_1) = update(&datastore)?;
let (deleted_2, tx_data_2) = update(&datastore)?;
File diff suppressed because it is too large Load Diff
@@ -123,8 +123,8 @@ impl SequencesState {
self.sequences.get_mut(&seq_id)
}
pub(super) fn insert(&mut self, seq_id: SequenceId, seq: Sequence) {
self.sequences.insert(seq_id, seq);
pub(super) fn insert(&mut self, seq: Sequence) {
self.sequences.insert(seq.id(), seq);
}
pub(super) fn remove(&mut self, seq_id: SequenceId) -> Option<Sequence> {
@@ -1,5 +1,5 @@
use super::{committed_state::CommittedState, datastore::Result, delete_table::DeleteTable, tx_state::TxState};
use crate::db::datastore::locking_tx_datastore::committed_state::CommittedIndexIterWithDeletedMutTx;
use super::mut_tx::{FilterDeleted, IndexScanRanged};
use super::{committed_state::CommittedState, datastore::Result, tx_state::TxState};
use crate::{
db::datastore::system_tables::{
StColumnFields, StColumnRow, StConstraintFields, StConstraintRow, StIndexFields, StIndexRow, StScheduledFields,
@@ -157,35 +157,35 @@ pub struct IterMutTx<'a> {
}
impl<'a> IterMutTx<'a> {
pub(super) fn new(table_id: TableId, tx_state: &'a TxState, committed_state: &'a CommittedState) -> Self {
pub(super) fn new(table_id: TableId, tx_state: &'a TxState, committed_state: &'a CommittedState) -> Result<Self> {
// If the table exist, the committed state has it as we apply schema changes immediately.
let Some(commit_table) = committed_state.get_table(table_id) else {
return Err(TableError::IdNotFound(SystemTable::st_table, table_id.0).into());
};
// I can neither confirm nor deny that we have a tx insert table.
let tx_state_ins = tx_state
.insert_tables
.get(&table_id)
.map(|table| (table, &tx_state.blob_store));
let stage = if let Some(table) = committed_state.tables.get(&table_id) {
// The committed state has changes for this table.
let iter = table.scan_rows(&committed_state.blob_store);
if let Some(del_tables) = tx_state.get_delete_table(table_id) {
// There are deletes in the tx state
// so we must exclude those (1b).
ScanStage::CommittedWithTxDeletes { iter, del_tables }
} else {
// There are no deletes in the tx state
// so we don't need to care about those (1a).
ScanStage::CommittedNoTxDeletes { iter }
}
let iter = commit_table.scan_rows(&committed_state.blob_store);
let stage = if let Some(deletes) = tx_state.get_delete_table(table_id) {
// There are deletes in the tx state
// so we must exclude those (1b).
let iter = FilterDeleted { iter, deletes };
ScanStage::CommittedWithTxDeletes { iter }
} else {
ScanStage::Continue
// There are no deletes in the tx state
// so we don't need to care about those (1a).
ScanStage::CommittedNoTxDeletes { iter }
};
Self { tx_state_ins, stage }
Ok(Self { tx_state_ins, stage })
}
}
enum ScanStage<'a> {
/// Continue to the next stage.
Continue,
/// Yielding rows from the current tx.
CurrentTx { iter: TableScanIter<'a> },
/// Yielding rows from the committed state
@@ -194,10 +194,7 @@ enum ScanStage<'a> {
/// Yielding rows from the committed state
/// but there are deleted rows in the tx state,
/// so we must check against those.
CommittedWithTxDeletes {
iter: TableScanIter<'a>,
del_tables: &'a DeleteTable,
},
CommittedWithTxDeletes { iter: FilterDeleted<'a, TableScanIter<'a>> },
}
impl<'a> Iterator for IterMutTx<'a> {
@@ -207,19 +204,11 @@ impl<'a> Iterator for IterMutTx<'a> {
fn next(&mut self) -> Option<Self::Item> {
// The finite state machine goes:
//
// Continue
// |
// |--> CurrentTx -------------------------------\
// | ^ |
// | \--------------------\ |
// | ^ |
// |--> CommittedNoTxDeletes ----|---------------\
// | ^ v
// \--> CommittedWithTxDeletes --|------/----> Stop
// CommittedNoTxDeletes ------\
// |----> CurrentTx ---> STOP
// CommittedWithTxDeletes ----/
loop {
match &mut self.stage {
ScanStage::Continue => {}
ScanStage::CommittedNoTxDeletes { iter } => {
// (1a) Go through the committed state for this table
// but do not consider deleted rows.
@@ -227,7 +216,7 @@ impl<'a> Iterator for IterMutTx<'a> {
return next;
}
}
ScanStage::CommittedWithTxDeletes { iter, del_tables } => {
ScanStage::CommittedWithTxDeletes { iter } => {
// (1b) Check the committed row's state in the current tx.
// If it's been deleted, skip it.
// If it's still present, yield it.
@@ -254,7 +243,7 @@ impl<'a> Iterator for IterMutTx<'a> {
//
// As a result, in MVCC, this branch will need to check if the `row_ref`
// also exists in the `tx_state.insert_tables` and ensure it is yielded only once.
if let next @ Some(_) = iter.find(|row_ref| !del_tables.contains(row_ref.pointer())) {
if let next @ Some(_) = iter.next() {
return next;
}
}
@@ -298,70 +287,6 @@ impl<'a> Iterator for IterTx<'a> {
}
}
pub struct IndexSeekIterIdMutTx<'a> {
pub(super) inserted_rows: IndexScanRangeIter<'a>,
pub(super) committed_rows: Option<IndexScanRangeIter<'a>>,
}
impl<'a> Iterator for IndexSeekIterIdMutTx<'a> {
type Item = RowRef<'a>;
fn next(&mut self) -> Option<Self::Item> {
if let Some(row_ref) = self.inserted_rows.next() {
return Some(row_ref);
}
self.committed_rows.as_mut().and_then(|i| i.next())
}
}
pub struct IndexSeekIterIdWithDeletedMutTx<'a> {
pub(super) inserted_rows: IndexScanRangeIter<'a>,
pub(super) committed_rows: Option<IndexScanRangeIter<'a>>,
pub(super) del_table: &'a DeleteTable,
}
impl<'a> Iterator for IndexSeekIterIdWithDeletedMutTx<'a> {
type Item = RowRef<'a>;
fn next(&mut self) -> Option<Self::Item> {
if let Some(row_ref) = self.inserted_rows.next() {
return Some(row_ref);
}
if let Some(row_ref) = self
.committed_rows
.as_mut()
// NOTE for future MVCC implementors:
// In MVCC, it is no longer valid to elide inserts in this way.
// When a transaction inserts a row, that row *must* appear in its insert tables,
// even if the row is already present in the committed state.
//
// Imagine a chain of committed but un-squashed transactions:
// `Committed 0: Insert Row A` - `Committed 1: Delete Row A`
// where `Committed 1` happens after `Committed 0`.
// Imagine a transaction `Running 2: Insert Row A`,
// which began before `Committed 1` was committed.
// Because `Committed 1` has since been committed,
// `Running 2` *must* happen after `Committed 1`.
// Therefore, the correct sequence of events is:
// - Insert Row A
// - Delete Row A
// - Insert Row A
// This is impossible to recover if `Running 2` elides its insert.
//
// As a result, in MVCC, this branch will need to check if the `row_ref`
// also exists in the `tx_state.insert_tables` and ensure it is yielded only once.
.and_then(|i| i.find(|row_ref| !self.del_table.contains(row_ref.pointer())))
{
// TODO(metrics): This doesn't actually fetch a row.
// Move this counter to `RowRef::read_row`.
// self.num_committed_rows_fetched += 1;
return Some(row_ref);
}
None
}
}
/// An [IterByColRangeTx] for an individual column value.
pub type IterByColEqTx<'a, 'r> = IterByColRangeTx<'a, &'r AlgebraicValue>;
/// An [IterByColRangeMutTx] for an individual column value.
@@ -372,13 +297,8 @@ pub enum IterByColRangeTx<'a, R: RangeBounds<AlgebraicValue>> {
/// When the column in question does not have an index.
Scan(ScanIterByColRangeTx<'a, R>),
/// When the column has an index, and the table
/// has been modified this transaction.
Index(IndexSeekIterIdMutTx<'a>),
/// When the column has an index, and the table
/// has not been modified in this transaction.
CommittedIndex(IndexScanRangeIter<'a>),
/// When the column has an index.
Index(IndexScanRangeIter<'a>),
}
impl<'a, R: RangeBounds<AlgebraicValue>> Iterator for IterByColRangeTx<'a, R> {
@@ -386,9 +306,8 @@ impl<'a, R: RangeBounds<AlgebraicValue>> Iterator for IterByColRangeTx<'a, R> {
fn next(&mut self) -> Option<Self::Item> {
match self {
IterByColRangeTx::Scan(range) => range.next(),
IterByColRangeTx::Index(range) => range.next(),
IterByColRangeTx::CommittedIndex(seek) => seek.next(),
IterByColRangeTx::Scan(iter) => iter.next(),
IterByColRangeTx::Index(iter) => iter.next(),
}
}
}
@@ -398,21 +317,8 @@ pub enum IterByColRangeMutTx<'a, R: RangeBounds<AlgebraicValue>> {
/// When the column in question does not have an index.
Scan(ScanIterByColRangeMutTx<'a, R>),
/// When the column has an index, and the table
/// has been modified this transaction.
Index(IndexSeekIterIdMutTx<'a>),
/// When the column has an index, and the table
/// has been modified this transaction, and there are deleted rows.
IndexWithDeletes(IndexSeekIterIdWithDeletedMutTx<'a>),
/// When the column has an index, and the table
/// has not been modified in this transaction.
CommittedIndex(IndexScanRangeIter<'a>),
/// When the column has an index, and the table
/// has not been modified in this transaction.
CommittedIndexWithDeletes(CommittedIndexIterWithDeletedMutTx<'a>),
/// When the column has an index.
Index(IndexScanRanged<'a>),
}
impl<'a, R: RangeBounds<AlgebraicValue>> Iterator for IterByColRangeMutTx<'a, R> {
@@ -422,9 +328,6 @@ impl<'a, R: RangeBounds<AlgebraicValue>> Iterator for IterByColRangeMutTx<'a, R>
match self {
IterByColRangeMutTx::Scan(range) => range.next(),
IterByColRangeMutTx::Index(range) => range.next(),
IterByColRangeMutTx::IndexWithDeletes(range) => range.next(),
IterByColRangeMutTx::CommittedIndex(seek) => seek.next(),
IterByColRangeMutTx::CommittedIndexWithDeletes(seek) => seek.next(),
}
}
}
@@ -436,6 +339,7 @@ pub struct ScanIterByColRangeTx<'a, R: RangeBounds<AlgebraicValue>> {
}
impl<'a, R: RangeBounds<AlgebraicValue>> ScanIterByColRangeTx<'a, R> {
// TODO(perf, centril): consider taking `cols` by reference.
pub(super) fn new(scan_iter: IterTx<'a>, cols: ColList, range: R) -> Self {
Self { scan_iter, cols, range }
}
@@ -463,6 +367,7 @@ pub struct ScanIterByColRangeMutTx<'a, R: RangeBounds<AlgebraicValue>> {
}
impl<'a, R: RangeBounds<AlgebraicValue>> ScanIterByColRangeMutTx<'a, R> {
// TODO(perf, centril): consider taking `cols` by reference.
pub(super) fn new(scan_iter: IterMutTx<'a>, cols: ColList, range: R) -> Self {
Self { scan_iter, cols, range }
}
@@ -1,7 +1,6 @@
use super::datastore::TxMetrics;
use super::{
committed_state::CommittedState,
datastore::Result,
datastore::{Result, TxMetrics},
state_view::{IterByColRangeTx, StateView},
IterByColEqTx, SharedReadGuard,
};
@@ -69,12 +68,8 @@ impl StateView for TxId {
cols: ColList,
range: R,
) -> Result<Self::IterByColRange<'_, R>> {
match self.committed_state_shared_lock.index_seek(table_id, &cols, &range) {
Some(committed_rows) => Ok(IterByColRangeTx::CommittedIndex(committed_rows)),
None => self
.committed_state_shared_lock
.iter_by_col_range(table_id, cols, range),
}
self.committed_state_shared_lock
.iter_by_col_range(table_id, cols, range)
}
fn iter_by_col_eq<'a, 'r>(
@@ -1,19 +1,23 @@
use super::delete_table::DeleteTable;
use super::{delete_table::DeleteTable, sequence::Sequence};
use core::ops::RangeBounds;
use spacetimedb_data_structures::map::{IntMap, IntSet};
use spacetimedb_primitives::{ColList, IndexId, TableId};
use spacetimedb_data_structures::map::IntMap;
use spacetimedb_lib::db::auth::StAccess;
use spacetimedb_primitives::{ColList, ConstraintId, IndexId, SequenceId, TableId};
use spacetimedb_sats::AlgebraicValue;
use spacetimedb_schema::schema::{ConstraintSchema, IndexSchema, SequenceSchema};
use spacetimedb_table::{
blob_store::{BlobStore, HashMapBlobStore},
indexes::{RowPointer, SquashedOffset},
pointer_map::PointerMap,
static_assert_size,
table::{IndexScanRangeIter, RowRef, Table, TableAndIndex},
table_index::TableIndex,
};
use std::collections::{btree_map, BTreeMap};
use thin_vec::ThinVec;
/// A mapping to find the actual index given an `IndexId`.
pub(super) type IndexIdMap = IntMap<IndexId, TableId>;
pub(super) type RemovedIndexIdSet = IntSet<IndexId>;
/// `TxState` tracks all of the modifications made during a particular transaction.
/// Rows inserted during a transaction will be added to insert_tables, and similarly,
@@ -67,16 +71,47 @@ pub(super) struct TxState {
/// - Traverse all rows in the `insert_tables` and free each of their blobs during rollback.
pub(super) blob_store: HashMapBlobStore,
/// Provides fast lookup for index id -> an index.
pub(super) index_id_map: IndexIdMap,
/// Lists all the `IndexId` that are to be removed from `CommittedState::index_id_map`.
// This is in an `Option<Box<>>` to reduce the size of `TxState` - it's very uncommon
// that this would be created.
pub(super) index_id_map_removals: Option<Box<RemovedIndexIdSet>>,
/// All of the immediately applied schema changes to the committed state during this transaction.
///
/// This is stored as a `ThinVec` as it would be very uncommon to add anything to this list.
pub(super) pending_schema_changes: ThinVec<PendingSchemaChange>,
}
static_assert_size!(TxState, 120);
/// A pending schema change is a change to a `TableSchema`
/// that has been applied immediately to the [`CommittedState`](super::committed_state::CommittedState)
/// and which need to be reverted if the transaction fails.
///
/// The goal here is that by applying changes immediately,
/// most of the datastore does not have to care about schema change transactionality.
/// The places that do need to care about changes are those that make them, and merge/rollback.
/// Architecting this way should benefit performance both during transactions and merge.
/// On rollback, it should be fairly cheap to e.g., just re-add an index or drop it on the floor.
pub(super) enum PendingSchemaChange {
/// The [`TableIndex`] / [`IndexSchema`] with `IndexId`
/// was removed from the table with [`TableId`].
IndexRemoved(TableId, IndexId, TableIndex, IndexSchema),
/// The index with [`IndexId`] was added.
/// If adding this index caused the pointer map to be removed,
/// it will be present here.
IndexAdded(TableId, IndexId, Option<PointerMap>),
/// The [`Table`] with [`TableId`] was removed.
TableRemoved(TableId, Table),
/// The table with [`TableId`] was added.
TableAdded(TableId),
/// The access of the table with [`TableId`] was changed.
/// The old access was stored.
TableAlterAccess(TableId, StAccess),
/// The constraint with [`ConstraintSchema`] was added to the table with [`TableId`].
ConstraintRemoved(TableId, ConstraintSchema),
/// The constraint with [`ConstraintId`] was added to the table with [`TableId`].
ConstraintAdded(TableId, ConstraintId),
/// The [`Sequence`] with [`SequenceSchema`] was added to the table with [`TableId`].
SequenceRemoved(TableId, Sequence, SequenceSchema),
/// The sequence with [`SequenceId`] was added to the table with [`TableId`].
SequenceAdded(TableId, SequenceId),
}
static_assert_size!(TxState, 88);
impl TxState {
/// Returns the row count in insert tables
@@ -107,11 +142,6 @@ impl TxState {
.map(|i| i.seek_range(range))
}
/// Returns the table associated with the given `index_id`, if any.
pub(super) fn get_table_for_index(&self, index_id: IndexId) -> Option<TableId> {
self.index_id_map.get(&index_id).copied()
}
/// Returns the table for `table_id` combined with the index for `index_id`, if both exist.
pub(super) fn get_index_by_id_with_table(&self, table_id: TableId, index_id: IndexId) -> Option<TableAndIndex<'_>> {
self.insert_tables
@@ -173,28 +203,22 @@ impl TxState {
Some((table, blob_store))
}
pub(super) fn get_table_and_blob_store_or_maybe_create_from<'this>(
&'this mut self,
pub(super) fn get_table_and_blob_store_or_create_from(
&mut self,
table_id: TableId,
template: Option<&Table>,
) -> Option<(
&'this mut Table,
&'this mut dyn BlobStore,
&'this mut IndexIdMap,
&'this mut DeleteTable,
)> {
template: &Table,
) -> TxTableForInsertion<'_> {
let insert_tables = &mut self.insert_tables;
let blob_store = &mut self.blob_store;
let idx_map = &mut self.index_id_map;
let table = match insert_tables.entry(table_id) {
btree_map::Entry::Vacant(e) => {
let new_table = template?.clone_structure(SquashedOffset::TX_STATE);
let new_table = template.clone_structure(SquashedOffset::TX_STATE);
e.insert(new_table)
}
btree_map::Entry::Occupied(e) => e.into_mut(),
};
let delete_table = get_delete_table_mut(&mut self.delete_tables, table_id, table);
Some((table, blob_store, idx_map, delete_table))
(table, blob_store, delete_table)
}
/// Assumes that the insert and delete tables exist for `table_id` and fetches them.
@@ -202,10 +226,7 @@ impl TxState {
/// # Safety
///
/// The insert and delete tables must exist.
pub unsafe fn assume_present_get_mut_table(
&mut self,
table_id: TableId,
) -> (&mut Table, &mut dyn BlobStore, &mut DeleteTable) {
pub unsafe fn assume_present_get_mut_table(&mut self, table_id: TableId) -> TxTableForInsertion<'_> {
let tx_blob_store: &mut dyn BlobStore = &mut self.blob_store;
let tx_table = self.insert_tables.get_mut(&table_id);
// SAFETY: we successfully got a `tx_table` before and haven't removed it since.
@@ -217,6 +238,8 @@ impl TxState {
}
}
pub(super) type TxTableForInsertion<'a> = (&'a mut Table, &'a mut dyn BlobStore, &'a mut DeleteTable);
fn get_delete_table_mut<'a>(
delete_tables: &'a mut BTreeMap<TableId, DeleteTable>,
table_id: TableId,
+1 -1
View File
@@ -353,7 +353,7 @@ static_assert_size!(
fn(AlgebraicValue) -> Result<IterByColRangeTx<'static, AlgebraicValue>, DBError>,
IterByColRangeTx<'static, AlgebraicValue>,
>,
232
144
);
static_assert_size!(
IndexSemiJoinLeft<
+1 -1
View File
@@ -87,7 +87,7 @@ impl ProductValue {
/// (including zero) fields, otherwise it will consist of a single [AlgebraicValue].
///
/// **Parameters:**
/// - `cols`: A [ColList] containing the indexes of fields to be projected.s
/// - `cols`: A [ColList] containing the indexes of fields to be projected.
pub fn project(&self, cols: &ColList) -> Result<AlgebraicValue, InvalidFieldError> {
if let Some(head) = cols.as_singleton() {
self.get_field(head.idx(), None).cloned()
+20 -11
View File
@@ -6,6 +6,7 @@
// TODO(1.0): change all the `Box<str>`s in this file to `Identifier`.
// This doesn't affect the ABI so can wait until 1.0.
use core::mem;
use itertools::Itertools;
use spacetimedb_lib::db::auth::{StAccess, StTableType};
use spacetimedb_lib::db::error::{DefType, SchemaError};
@@ -191,11 +192,13 @@ impl TableSchema {
&self.columns
}
/// Clear all the [Self::indexes], [Self::sequences] & [Self::constraints]
pub fn clear_adjacent_schemas(&mut self) {
self.indexes.clear();
self.sequences.clear();
self.constraints.clear();
/// Extracts all the [Self::indexes], [Self::sequences], and [Self::constraints].
pub fn take_adjacent_schemas(&mut self) -> (Vec<IndexSchema>, Vec<SequenceSchema>, Vec<ConstraintSchema>) {
(
mem::take(&mut self.indexes),
mem::take(&mut self.sequences),
mem::take(&mut self.constraints),
)
}
// Crud operation on adjacent schemas
@@ -210,8 +213,8 @@ impl TableSchema {
}
/// Removes the given `sequence_id`
pub fn remove_sequence(&mut self, sequence_id: SequenceId) {
self.sequences.retain(|x| x.sequence_id != sequence_id)
pub fn remove_sequence(&mut self, sequence_id: SequenceId) -> Option<SequenceSchema> {
find_remove(&mut self.sequences, |x| x.sequence_id == sequence_id)
}
/// Add OR replace the [IndexSchema]
@@ -224,8 +227,8 @@ impl TableSchema {
}
/// Removes the given `index_id`
pub fn remove_index(&mut self, index_id: IndexId) {
self.indexes.retain(|x| x.index_id != index_id)
pub fn remove_index(&mut self, index_id: IndexId) -> Option<IndexSchema> {
find_remove(&mut self.indexes, |x| x.index_id == index_id)
}
/// Add OR replace the [ConstraintSchema]
@@ -242,8 +245,8 @@ impl TableSchema {
}
/// Removes the given `index_id`
pub fn remove_constraint(&mut self, constraint_id: ConstraintId) {
self.constraints.retain(|x| x.constraint_id != constraint_id)
pub fn remove_constraint(&mut self, constraint_id: ConstraintId) -> Option<ConstraintSchema> {
find_remove(&mut self.constraints, |x| x.constraint_id == constraint_id)
}
/// Concatenate the column names from the `columns`
@@ -532,6 +535,12 @@ impl TableSchema {
}
}
/// Removes and returns the first element satisfying `predicate` in `vec`.
fn find_remove<T>(vec: &mut Vec<T>, predicate: impl Fn(&T) -> bool) -> Option<T> {
let pos = vec.iter().position(predicate)?;
Some(vec.remove(pos))
}
/// Like `assert_eq!` for `anyhow`, but `$msg` is just a string, not a format string.
macro_rules! ensure_eq {
($a:expr, $b:expr, $msg:expr) => {
+36 -39
View File
@@ -40,7 +40,11 @@ use spacetimedb_sats::{
ser::{Serialize, Serializer},
u256, AlgebraicValue, ProductType, ProductValue,
};
use spacetimedb_schema::{def::IndexAlgorithm, schema::TableSchema, type_for_generate::PrimitiveType};
use spacetimedb_schema::{
def::IndexAlgorithm,
schema::{IndexSchema, TableSchema},
type_for_generate::PrimitiveType,
};
use std::{
collections::{btree_map, BTreeMap},
sync::Arc,
@@ -1062,8 +1066,8 @@ impl Table {
/// If none but `self` refers to the schema, then the mutation will be in-place.
/// Otherwise, the schema must be cloned, mutated,
/// and then the cloned version is written back to the table.
pub fn with_mut_schema(&mut self, with: impl FnOnce(&mut TableSchema)) {
with(Arc::make_mut(&mut self.schema));
pub fn with_mut_schema<R>(&mut self, with: impl FnOnce(&mut TableSchema) -> R) -> R {
with(Arc::make_mut(&mut self.schema))
}
/// Returns a new [`TableIndex`] for `table`.
@@ -1100,35 +1104,42 @@ impl Table {
/// # Safety
///
/// Caller must promise that `index` was constructed with the same row type/layout as this table.
pub unsafe fn add_index(&mut self, index_id: IndexId, index: TableIndex) {
pub unsafe fn add_index(&mut self, index_id: IndexId, index: TableIndex) -> Option<PointerMap> {
let is_unique = index.is_unique();
self.indexes.insert(index_id, index);
// Remove the pointer map, if any.
if is_unique {
self.pointer_map = None;
self.pointer_map.take()
} else {
None
}
}
/// Removes an index from the table.
///
/// Returns whether an index existed with `index_id`.
pub fn delete_index(&mut self, blob_store: &dyn BlobStore, index_id: IndexId) -> bool {
let index = self.indexes.remove(&index_id);
let ret = index.is_some();
pub fn delete_index(
&mut self,
blob_store: &dyn BlobStore,
index_id: IndexId,
pointer_map: Option<PointerMap>,
) -> Option<(TableIndex, IndexSchema)> {
let index = self.indexes.remove(&index_id)?;
// If we removed the last unique index, add a pointer map.
if index.is_some_and(|i| i.is_unique()) && !self.indexes.values().any(|idx| idx.is_unique()) {
self.rebuild_pointer_map(blob_store);
if index.is_unique() && !self.indexes.values().any(|idx| idx.is_unique()) {
self.pointer_map = Some(pointer_map.unwrap_or_else(|| self.rebuild_pointer_map(blob_store)));
}
// Remove index from schema.
//
// This likely will do a clone-write as over time?
// The schema might have found other referents.
self.with_mut_schema(|s| s.indexes.retain(|x| x.index_id != index_id));
ret
let schema = self
.with_mut_schema(|s| s.remove_index(index_id))
.expect("there should be an index with `index_id`");
Some((index, schema))
}
/// Returns an iterator over all the rows of `self`, yielded as [`RefRef`]s.
@@ -1225,7 +1236,7 @@ impl Table {
// Recompute table metadata based on the new pages.
// Compute the row count first, in case later computations want to use it as a capacity to pre-allocate.
self.compute_row_count(blob_store);
self.rebuild_pointer_map(blob_store);
self.pointer_map = Some(self.rebuild_pointer_map(blob_store));
}
/// Consumes the table, returning some constituents needed for merge.
@@ -1735,16 +1746,10 @@ impl<'a> Iterator for IndexScanPointIter<'a> {
type Item = RowRef<'a>;
fn next(&mut self) -> Option<Self::Item> {
let ptr = self.btree_index_iter.next()?;
// FIXME: Determine if this is correct and if so use `_unchecked`.
// Will a table's index necessarily hold only pointers into that index?
// Edge case: if an index is added during a transaction which then scans that index,
// it appears that the newly-created `TxState` index
// will also hold pointers into the `CommittedState`.
//
// SAFETY: Assuming this is correct,
// `ptr` came from the index, which always holds pointers to valid rows.
self.table.get_row_ref(self.blob_store, ptr)
self.btree_index_iter.next().map(|ptr| {
// SAFETY: `ptr` came from the index, which always holds pointers to valid rows for its table.
unsafe { self.table.get_row_ref_unchecked(self.blob_store, ptr) }
})
}
}
@@ -1765,16 +1770,10 @@ impl<'a> Iterator for IndexScanRangeIter<'a> {
type Item = RowRef<'a>;
fn next(&mut self) -> Option<Self::Item> {
let ptr = self.btree_index_iter.next()?;
// FIXME: Determine if this is correct and if so use `_unchecked`.
// Will a table's index necessarily hold only pointers into that index?
// Edge case: if an index is added during a transaction which then scans that index,
// it appears that the newly-created `TxState` index
// will also hold pointers into the `CommittedState`.
//
// SAFETY: Assuming this is correct,
// `ptr` came from the index, which always holds pointers to valid rows.
self.table.get_row_ref(self.blob_store, ptr)
self.btree_index_iter.next().map(|ptr| {
// SAFETY: `ptr` came from the index, which always holds pointers to valid rows for its table.
unsafe { self.table.get_row_ref_unchecked(self.blob_store, ptr) }
})
}
}
@@ -1930,14 +1929,12 @@ impl Table {
/// Called when restoring from a snapshot after installing the pages,
/// but after computing the row count,
/// since snapshots do not save the pointer map..
fn rebuild_pointer_map(&mut self, blob_store: &dyn BlobStore) {
fn rebuild_pointer_map(&mut self, blob_store: &dyn BlobStore) -> PointerMap {
// TODO(perf): Pre-allocate `PointerMap.map` with capacity `self.row_count`.
// Alternatively, do this at the same time as `compute_row_count`.
let ptrs = self
.scan_rows(blob_store)
self.scan_rows(blob_store)
.map(|row_ref| (row_ref.row_hash(), row_ref.pointer()))
.collect::<PointerMap>();
self.pointer_map = Some(ptrs);
.collect()
}
/// Compute and store `self.row_count` and `self.blob_store_bytes`
+50 -1
View File
@@ -1181,7 +1181,7 @@ impl TableIndex {
}
}
/// Extends [`TableIndex`] with `rows`.s
/// Extends [`TableIndex`] with `rows`.
///
/// Returns the first unique constraint violation caused when adding this index, if any.
///
@@ -1201,6 +1201,55 @@ impl TableIndex {
.try_for_each(|row_ref| unsafe { self.check_and_insert(row_ref) })
}
/// Returns an error with the first unique constraint violation that
/// would occur if `self` and `other` were to be merged.
pub fn can_merge(&self, other: &Self) -> Result<(), RowPointer> {
use TypedIndex::*;
match (&self.idx, &other.idx) {
// For non-unique indices, it's always possible to merge.
(BtreeBool(_), BtreeBool(_))
| (BtreeU8(_), BtreeU8(_))
| (BtreeSumTag(_), BtreeSumTag(_))
| (BtreeI8(_), BtreeI8(_))
| (BtreeU16(_), BtreeU16(_))
| (BtreeI16(_), BtreeI16(_))
| (BtreeU32(_), BtreeU32(_))
| (BtreeI32(_), BtreeI32(_))
| (BtreeU64(_), BtreeU64(_))
| (BtreeI64(_), BtreeI64(_))
| (BtreeU128(_), BtreeU128(_))
| (BtreeI128(_), BtreeI128(_))
| (BtreeU256(_), BtreeU256(_))
| (BtreeI256(_), BtreeI256(_))
| (BtreeString(_), BtreeString(_))
| (BtreeAV(_), BtreeAV(_)) => Ok(()),
// For unique indices, we'll need to see if everything in `other` can be added to `idx`.
(UniqueBtreeBool(idx), UniqueBtreeBool(other)) => idx.can_merge(other).map_err(|ptr| *ptr),
(UniqueBtreeU8(idx), UniqueBtreeU8(other)) => idx.can_merge(other).map_err(|ptr| *ptr),
(UniqueBtreeSumTag(idx), UniqueBtreeSumTag(other)) => idx.can_merge(other).map_err(|ptr| *ptr),
(UniqueBtreeI8(idx), UniqueBtreeI8(other)) => idx.can_merge(other).map_err(|ptr| *ptr),
(UniqueBtreeU16(idx), UniqueBtreeU16(other)) => idx.can_merge(other).map_err(|ptr| *ptr),
(UniqueBtreeI16(idx), UniqueBtreeI16(other)) => idx.can_merge(other).map_err(|ptr| *ptr),
(UniqueBtreeU32(idx), UniqueBtreeU32(other)) => idx.can_merge(other).map_err(|ptr| *ptr),
(UniqueBtreeI32(idx), UniqueBtreeI32(other)) => idx.can_merge(other).map_err(|ptr| *ptr),
(UniqueBtreeU64(idx), UniqueBtreeU64(other)) => idx.can_merge(other).map_err(|ptr| *ptr),
(UniqueBtreeI64(idx), UniqueBtreeI64(other)) => idx.can_merge(other).map_err(|ptr| *ptr),
(UniqueBtreeU128(idx), UniqueBtreeU128(other)) => idx.can_merge(other).map_err(|ptr| *ptr),
(UniqueBtreeI128(idx), UniqueBtreeI128(other)) => idx.can_merge(other).map_err(|ptr| *ptr),
(UniqueBtreeU256(idx), UniqueBtreeU256(other)) => idx.can_merge(other).map_err(|ptr| *ptr),
(UniqueBtreeI256(idx), UniqueBtreeI256(other)) => idx.can_merge(other).map_err(|ptr| *ptr),
(UniqueBtreeString(idx), UniqueBtreeString(other)) => idx.can_merge(other).map_err(|ptr| *ptr),
(UniqueBtreeAV(idx), UniqueBtreeAV(other)) => idx.can_merge(other).map_err(|ptr| *ptr),
(UniqueDirectU8(idx), UniqueDirectU8(other)) => idx.can_merge(other),
(UniqueDirectSumTag(idx), UniqueDirectSumTag(other)) => idx.can_merge(other),
(UniqueDirectU16(idx), UniqueDirectU16(other)) => idx.can_merge(other),
(UniqueDirectU32(idx), UniqueDirectU32(other)) => idx.can_merge(other),
(UniqueDirectU64(idx), UniqueDirectU64(other)) => idx.can_merge(other),
_ => unreachable!("non-matching index kinds"),
}
}
/// Deletes all entries from the index, leaving it empty.
///
/// When inserting a newly-created index into the committed state,
@@ -120,6 +120,18 @@ impl UniqueDirectFixedCapIndex {
self.array.fill(NONE_PTR);
self.len = 0;
}
/// Returns whether `other` can be merged into `self`
/// with an error containing the element in `self` that caused the violation.
pub(crate) fn can_merge(&self, other: &UniqueDirectFixedCapIndex) -> Result<(), RowPointer> {
for (slot_s, slot_o) in self.array.iter().zip(other.array.iter()) {
if *slot_s != NONE_PTR && *slot_o != NONE_PTR {
// For the same key, we found both slots occupied, so we cannot merge.
return Err(slot_s.with_reserved_bit(false));
}
}
Ok(())
}
}
/// An iterator over a range of keys in a [`UniqueDirectFixedCapIndex`].
@@ -204,6 +204,25 @@ impl UniqueDirectIndex {
self.outer.clear();
self.len = 0;
}
/// Returns whether `other` can be merged into `self`
/// with an error containing the element in `self` that caused the violation.
pub(crate) fn can_merge(&self, other: &UniqueDirectIndex) -> Result<(), RowPointer> {
for (inner_s, inner_o) in self.outer.iter().zip(&other.outer) {
let (Some(inner_s), Some(inner_o)) = (inner_s, inner_o) else {
continue;
};
for (slot_s, slot_o) in inner_s.inner.iter().zip(inner_o.inner.iter()) {
if *slot_s != NONE_PTR && *slot_o != NONE_PTR {
// For the same key, we found both slots occupied, so we cannot merge.
return Err(slot_s.with_reserved_bit(false));
}
}
}
Ok(())
}
}
/// An iterator over the potential value in a [`UniqueDirectMap`] for a given key.
@@ -81,6 +81,15 @@ impl<K: Ord, V: Ord> UniqueMap<K, V> {
pub fn clear(&mut self) {
self.map.clear();
}
/// Returns whether `other` can be merged into `self`
/// with an error containing the element in `self` that caused the violation.
pub(crate) fn can_merge(&self, other: &UniqueMap<K, V>) -> Result<(), &V> {
let Some(found) = other.map.keys().find_map(|key| self.map.get(key)) else {
return Ok(());
};
Err(found)
}
}
/// An iterator over the potential value in a [`UniqueMap`] for a given key.