From 0af71cf7f36ad678600a932d2cfcb11aa2cb696e Mon Sep 17 00:00:00 2001 From: Mazdak Farrokhzad Date: Thu, 17 Apr 2025 21:48:08 +0200 Subject: [PATCH] datastore: apply schema changes immediately to committed state. --- .../locking_tx_datastore/committed_state.rs | 191 +- .../locking_tx_datastore/datastore.rs | 5 +- .../datastore/locking_tx_datastore/mut_tx.rs | 1673 ++++++++--------- .../locking_tx_datastore/sequence.rs | 4 +- .../locking_tx_datastore/state_view.rs | 161 +- .../db/datastore/locking_tx_datastore/tx.rs | 11 +- .../locking_tx_datastore/tx_state.rs | 89 +- crates/core/src/vm.rs | 2 +- crates/sats/src/product_value.rs | 2 +- crates/schema/src/schema.rs | 31 +- crates/table/src/table.rs | 75 +- crates/table/src/table_index/mod.rs | 51 +- .../unique_direct_fixed_cap_index.rs | 12 + .../src/table_index/unique_direct_index.rs | 19 + crates/table/src/table_index/uniquemap.rs | 9 + 15 files changed, 1170 insertions(+), 1165 deletions(-) diff --git a/crates/core/src/db/datastore/locking_tx_datastore/committed_state.rs b/crates/core/src/db/datastore/locking_tx_datastore/committed_state.rs index b7d2eddfe7..fb18c4f344 100644 --- a/crates/core/src/db/datastore/locking_tx_datastore/committed_state.rs +++ b/crates/core/src/db/datastore/locking_tx_datastore/committed_state.rs @@ -2,14 +2,10 @@ use super::{ datastore::Result, delete_table::DeleteTable, sequence::{Sequence, SequencesState}, - state_view::{IterByColRangeTx, StateView}, - tx_state::{IndexIdMap, RemovedIndexIdSet, TxState}, + state_view::{IterByColRangeTx, IterTx, ScanIterByColRangeTx, StateView}, + tx_state::{IndexIdMap, PendingSchemaChange, TxState}, IterByColEqTx, }; -use crate::{ - db::datastore::locking_tx_datastore::state_view::{IterTx, ScanIterByColRangeTx}, - error::IndexError, -}; use crate::{ db::{ datastore::{ @@ -25,7 +21,7 @@ use crate::{ }, db_metrics::DB_METRICS, }, - error::TableError, + error::{IndexError, TableError}, execution_context::ExecutionContext, }; use anyhow::anyhow; @@ -116,14 +112,14 @@ impl StateView for CommittedState { cols: ColList, range: R, ) -> Result> { - // TODO: Why does this unconditionally return a `Scan` iter, - // instead of trying to return a `CommittedIndex` iter? - // Answer: Because CommittedIndexIter::tx_state: Option<&'a TxState> need to be Some to read after reopen - Ok(IterByColRangeTx::Scan(ScanIterByColRangeTx::new( - self.iter(table_id)?, - cols, - range, - ))) + match self.index_seek(table_id, &cols, &range) { + Some(iter) => Ok(IterByColRangeTx::Index(iter)), + None => Ok(IterByColRangeTx::Scan(ScanIterByColRangeTx::new( + self.iter(table_id)?, + cols, + range, + ))), + } } fn iter_by_col_eq<'a, 'r>( @@ -386,7 +382,7 @@ impl CommittedState { // but before creating any user tables. // The `sequence` we read out of `row_ref` above, and used to construct `seq`, // will correctly reflect the state after creating user tables. - sequence_state.insert(seq.id(), seq); + sequence_state.insert(seq); } Ok(()) } @@ -412,7 +408,7 @@ impl CommittedState { for index_row in rows { let index_id = index_row.index_id; let table_id = index_row.table_id; - let Some((table, blob_store)) = self.get_table_and_blob_store(table_id) else { + let (Some(table), blob_store, index_id_map) = self.get_table_and_blob_store_mut(table_id) else { panic!("Cannot create index for table which doesn't exist in committed state"); }; let algo: IndexAlgorithm = index_row.index_algorithm.into(); @@ -422,7 +418,7 @@ impl CommittedState { let index = table.new_index(&algo, is_unique)?; // SAFETY: `index` was derived from `table`. unsafe { table.insert_index(blob_store, index_id, index) }; - self.index_id_map.insert(index_id, table_id); + index_id_map.insert(index_id, table_id); } Ok(()) } @@ -547,9 +543,6 @@ impl CommittedState { pub(super) fn merge(&mut self, tx_state: TxState, ctx: &ExecutionContext) -> TxData { let mut tx_data = TxData::default(); - // First, merge index id fast-lookup map changes and delete indices. - self.merge_index_map(tx_state.index_id_map, tx_state.index_id_map_removals.as_deref()); - // First, apply deletes. This will free up space in the committed tables. self.merge_apply_deletes(&mut tx_data, tx_state.delete_tables); @@ -569,7 +562,7 @@ impl CommittedState { fn merge_apply_deletes(&mut self, tx_data: &mut TxData, delete_tables: BTreeMap) { for (table_id, row_ptrs) in delete_tables { - if let Some((table, blob_store)) = self.get_table_and_blob_store(table_id) { + if let (Some(table), blob_store, _) = self.get_table_and_blob_store_mut(table_id) { let mut deletes = Vec::with_capacity(row_ptrs.len()); // Note: we maintain the invariant that the delete_tables @@ -631,19 +624,7 @@ impl CommittedState { tx_data.set_inserts_for_table(table_id, table_name, inserts.into()); } - let (schema, indexes, pages) = tx_table.consume_for_merge(); - - // Add all newly created indexes to the committed state. - for (index_id, mut index) in indexes { - if !commit_table.indexes.contains_key(&index_id) { - index.clear(); - // SAFETY: `tx_table` is derived from `commit_table`, - // so they have the same row type. - // This entails that all indices in `tx_table` - // were constructed with the same row type/layout as `commit_table`. - unsafe { commit_table.insert_index(commit_blob_store, index_id, index) }; - } - } + let (schema, _indexes, pages) = tx_table.consume_for_merge(); // The schema may have been modified in the transaction. // Update this last to placate borrowck and avoid a clone. @@ -655,43 +636,104 @@ impl CommittedState { } } - fn merge_index_map(&mut self, index_id_map: IndexIdMap, index_id_map_removals: Option<&RemovedIndexIdSet>) { - // Remove indices that tx-state removed. - // It's not necessarily the case that the index already existed in the committed state. - for (index_id, table_id) in index_id_map_removals - .into_iter() - .flatten() - .filter_map(|index_id| self.index_id_map.remove(index_id).map(|x| (*index_id, x))) - { - assert!(self - .tables - .get_mut(&table_id) - .expect("table to delete index from should exist") - .delete_index(&self.blob_store, index_id)); + /// Rolls back the changes immediately made to the committed state during a transaction. + pub(super) fn rollback(&mut self, seq_state: &mut SequencesState, tx_state: TxState) { + // Roll back the changes in the reverse order in which they were made + // so that e.g., the last change is undone first. + for change in tx_state.pending_schema_changes.into_iter().rev() { + self.rollback_pending_schema_change(seq_state, change); + } + } + + fn rollback_pending_schema_change( + &mut self, + seq_state: &mut SequencesState, + change: PendingSchemaChange, + ) -> Option<()> { + use PendingSchemaChange::*; + match change { + // An index was removed. Add it back. + IndexRemoved(table_id, index_id, table_index, index_schema) => { + let table = self.tables.get_mut(&table_id)?; + // SAFETY: `table_index` was derived from `table`. + unsafe { table.add_index(index_id, table_index) }; + table.with_mut_schema(|s| s.update_index(index_schema)); + self.index_id_map.insert(index_id, table_id); + } + // An index was added. Remove it. + IndexAdded(table_id, index_id, pointer_map) => { + let table = self.tables.get_mut(&table_id)?; + table.delete_index(&self.blob_store, index_id, pointer_map); + table.with_mut_schema(|s| s.remove_index(index_id)); + self.index_id_map.remove(&index_id); + } + // A table was removed. Add it back. + TableRemoved(table_id, table) => { + // We don't need to deal with sub-components. + // That is, we don't need to add back indices and such. + // Instead, there will be separate pending schema changes like `IndexRemoved`. + self.tables.insert(table_id, table); + } + // A table was added. Remove it. + TableAdded(table_id) => { + // We don't need to deal with sub-components. + // That is, we don't need to remove indices and such. + // Instead, there will be separate pending schema changes like `IndexAdded`. + self.tables.remove(&table_id); + } + // A table's access was changed. Change back to the old one. + TableAlterAccess(table_id, access) => { + let table = self.tables.get_mut(&table_id)?; + table.with_mut_schema(|s| s.table_access = access); + } + // A constraint was removed. Add it back. + ConstraintRemoved(table_id, constraint_schema) => { + let table = self.tables.get_mut(&table_id)?; + table.with_mut_schema(|s| s.update_constraint(constraint_schema)); + } + // A constraint was added. Remove it. + ConstraintAdded(table_id, constraint_id) => { + let table = self.tables.get_mut(&table_id)?; + table.with_mut_schema(|s| s.remove_constraint(constraint_id)); + } + // A sequence was removed. Add it back. + SequenceRemoved(table_id, seq, schema) => { + let table = self.tables.get_mut(&table_id)?; + table.with_mut_schema(|s| s.update_sequence(schema)); + seq_state.insert(seq); + } + // A sequence was added. Remove it. + SequenceAdded(table_id, sequence_id) => { + let table = self.tables.get_mut(&table_id)?; + table.with_mut_schema(|s| s.remove_sequence(sequence_id)); + seq_state.remove(sequence_id); + } } - // Add the ones tx-state added. - self.index_id_map.extend(index_id_map); + Some(()) } pub(super) fn get_table(&self, table_id: TableId) -> Option<&Table> { self.tables.get(&table_id) } - pub(super) fn get_table_mut(&mut self, table_id: TableId) -> (Option<&mut Table>, &PagePool) { - (self.tables.get_mut(&table_id), &self.page_pool) + #[allow(clippy::unnecessary_lazy_evaluations)] + pub fn get_table_and_blob_store(&self, table_id: TableId) -> Result> { + let table = self + .get_table(table_id) + .ok_or_else(|| TableError::IdNotFoundState(table_id))?; + Ok((table, &self.blob_store as &dyn BlobStore, &self.index_id_map)) } - pub fn get_table_and_blob_store_immutable(&self, table_id: TableId) -> Option<(&Table, &dyn BlobStore)> { - self.tables - .get(&table_id) - .map(|tbl| (tbl, &self.blob_store as &dyn BlobStore)) - } - - pub(super) fn get_table_and_blob_store(&mut self, table_id: TableId) -> Option<(&mut Table, &mut dyn BlobStore)> { - self.tables - .get_mut(&table_id) - .map(|tbl| (tbl, &mut self.blob_store as &mut dyn BlobStore)) + pub(super) fn get_table_and_blob_store_mut( + &mut self, + table_id: TableId, + ) -> (Option<&mut Table>, &mut dyn BlobStore, &mut IndexIdMap) { + ( + self.tables.get_mut(&table_id), + &mut self.blob_store as &mut dyn BlobStore, + &mut self.index_id_map, + ) } fn make_table(schema: Arc) -> Table { @@ -712,7 +754,7 @@ impl CommittedState { .entry(table_id) .or_insert_with(|| Self::make_table(schema.clone())); let blob_store = &mut self.blob_store; - let pool = &mut self.page_pool; + let pool = &self.page_pool; (table, blob_store, pool) } @@ -750,25 +792,4 @@ impl CommittedState { } } -pub struct CommittedIndexIterWithDeletedMutTx<'a> { - committed_rows: IndexScanRangeIter<'a>, - del_table: &'a DeleteTable, -} - -impl<'a> CommittedIndexIterWithDeletedMutTx<'a> { - pub(super) fn new(committed_rows: IndexScanRangeIter<'a>, del_table: &'a DeleteTable) -> Self { - Self { - committed_rows, - del_table, - } - } -} - -impl<'a> Iterator for CommittedIndexIterWithDeletedMutTx<'a> { - type Item = RowRef<'a>; - - fn next(&mut self) -> Option { - self.committed_rows - .find(|row_ref| !self.del_table.contains(row_ref.pointer())) - } -} +pub(super) type CommitTableForInsertion<'a> = (&'a Table, &'a dyn BlobStore, &'a IndexIdMap); diff --git a/crates/core/src/db/datastore/locking_tx_datastore/datastore.rs b/crates/core/src/db/datastore/locking_tx_datastore/datastore.rs index 7652cc3d5a..cfeee8ecc7 100644 --- a/crates/core/src/db/datastore/locking_tx_datastore/datastore.rs +++ b/crates/core/src/db/datastore/locking_tx_datastore/datastore.rs @@ -638,7 +638,8 @@ impl MutTxDatastore for Locking { index_id: IndexId, row: &[u8], ) -> Result<(ColList, RowRef<'a>, UpdateFlags)> { - tx.update(table_id, index_id, row) + let (gens, row_ref, update_flags) = tx.update(table_id, index_id, row)?; + Ok((gens, row_ref.collapse(), update_flags)) } fn metadata_mut_tx(&self, tx: &Self::MutTx) -> Result> { @@ -2358,7 +2359,7 @@ mod tests { // the delete should first mark the committed row as deleted in the delete tables, // and then it should remove it from the delete tables upon insertion, // rather than actually inserting it in the tx state. - // So the second transaction should be observationally a no-op.s + // So the second transaction should be observationally a no-op. // There was a bug in the datastore that did not respect this in the presence of a unique index. let (deleted_1, tx_data_1) = update(&datastore)?; let (deleted_2, tx_data_2) = update(&datastore)?; diff --git a/crates/core/src/db/datastore/locking_tx_datastore/mut_tx.rs b/crates/core/src/db/datastore/locking_tx_datastore/mut_tx.rs index 9a9bd15653..e9349b5b75 100644 --- a/crates/core/src/db/datastore/locking_tx_datastore/mut_tx.rs +++ b/crates/core/src/db/datastore/locking_tx_datastore/mut_tx.rs @@ -1,11 +1,11 @@ use super::{ - committed_state::CommittedState, + committed_state::{CommitTableForInsertion, CommittedState}, datastore::{Result, TxMetrics}, delete_table::DeleteTable, sequence::{Sequence, SequencesState}, - state_view::{IndexSeekIterIdMutTx, ScanIterByColRangeMutTx, StateView}, + state_view::{IterByColEqMutTx, IterByColRangeMutTx, IterMutTx, ScanIterByColRangeMutTx, StateView}, tx::TxId, - tx_state::{IndexIdMap, TxState}, + tx_state::{IndexIdMap, PendingSchemaChange, TxState, TxTableForInsertion}, SharedMutexGuard, SharedWriteGuard, }; use crate::db::datastore::system_tables::{ @@ -15,23 +15,14 @@ use crate::db::datastore::system_tables::{ ST_COLUMN_ID, ST_CONSTRAINT_ID, ST_INDEX_ID, ST_ROW_LEVEL_SECURITY_ID, ST_SCHEDULED_ID, ST_SEQUENCE_ID, ST_TABLE_ID, }; -use crate::db::datastore::traits::{RowTypeForTable, TxData}; -use crate::db::datastore::{ - locking_tx_datastore::committed_state::CommittedIndexIterWithDeletedMutTx, traits::InsertFlags, -}; -use crate::db::datastore::{ - locking_tx_datastore::state_view::{ - IndexSeekIterIdWithDeletedMutTx, IterByColEqMutTx, IterByColRangeMutTx, IterMutTx, - }, - traits::UpdateFlags, -}; +use crate::db::datastore::traits::{InsertFlags, RowTypeForTable, TxData, UpdateFlags}; use crate::execution_context::Workload; use crate::{ error::{IndexError, SequenceError, TableError}, execution_context::ExecutionContext, }; -use core::cell::RefCell; use core::ops::RangeBounds; +use core::{cell::RefCell, mem}; use core::{iter, ops::Bound}; use smallvec::SmallVec; use spacetimedb_execution::{dml::MutDatastore, Datastore, DeltaStore}; @@ -51,9 +42,11 @@ use spacetimedb_sats::{ }; use spacetimedb_schema::schema::{ConstraintSchema, IndexSchema, RowLevelSecuritySchema, SequenceSchema, TableSchema}; use spacetimedb_table::{ - blob_store::{BlobStore, HashMapBlobStore}, + blob_store::BlobStore, indexes::{RowPointer, SquashedOffset}, - table::{DuplicateError, IndexScanRangeIter, InsertError, RowRef, Table, TableAndIndex}, + static_assert_size, + table::{BlobNumBytes, DuplicateError, IndexScanRangeIter, InsertError, RowRef, Table, TableAndIndex}, + table_index::TableIndex, }; use std::{ sync::Arc, @@ -77,6 +70,8 @@ pub struct MutTxId { pub(crate) metrics: ExecutionMetrics, } +static_assert_size!(MutTxId, 400); + impl Datastore for MutTxId { fn blob_store(&self) -> &dyn BlobStore { &self.committed_state_write_lock.blob_store @@ -121,7 +116,14 @@ impl MutDatastore for MutTxId { } impl MutTxId { - fn drop_col_eq(&mut self, table_id: TableId, col_pos: ColId, value: &AlgebraicValue) -> Result<()> { + /// Push a pending schema change. + fn push_schema_change(&mut self, change: PendingSchemaChange) { + self.tx_state.pending_schema_changes.push(change); + } + + /// Deletes all the rows in table with `table_id` + /// where the column with `col_pos` equals `value`. + fn delete_col_eq(&mut self, table_id: TableId, col_pos: ColId, value: &AlgebraicValue) -> Result<()> { let rows = self.iter_by_col_eq(table_id, col_pos, value)?; let ptrs_to_delete = rows.map(|row_ref| row_ref.pointer()).collect::>(); if ptrs_to_delete.is_empty() { @@ -154,14 +156,15 @@ impl MutTxId { // checks for children are performed in the relevant `create_...` functions. } - log::trace!("TABLE CREATING: {}", table_schema.table_name); + let table_name = table_schema.table_name.clone(); + log::trace!("TABLE CREATING: {}", table_name); // Insert the table row into `st_tables` // NOTE: Because `st_tables` has a unique index on `table_name`, this will // fail if the table already exists. let row = StTableRow { table_id: TableId::SENTINEL, - table_name: table_schema.table_name[..].into(), + table_name: table_name.clone(), table_type: table_schema.table_type, table_access: table_schema.table_access, table_primary_key: table_schema.primary_key.map(Into::into), @@ -187,9 +190,11 @@ impl MutTxId { self.insert_via_serialize_bsatn(ST_COLUMN_ID, &row)?; } - let mut schema_internal = table_schema.clone(); - // Remove all indexes, constraints, and sequences from the schema; we will add them back later with correct index_id, ... - schema_internal.clear_adjacent_schemas(); + let schedule = table_schema.schedule.clone(); + let mut schema_internal = table_schema; + // Extract all indexes, constraints, and sequences from the schema. + // We will add them back later with correct ids. + let (indices, sequences, constraints) = schema_internal.take_adjacent_schemas(); // Create the in memory representation of the table // NOTE: This should be done before creating the indexes @@ -198,7 +203,7 @@ impl MutTxId { self.create_table_internal(schema_internal.into()); // Insert the scheduled table entry into `st_scheduled` - if let Some(schedule) = table_schema.schedule { + if let Some(schedule) = schedule { let row = StScheduledRow { table_id: schedule.table_id, schedule_id: ScheduleId::SENTINEL, @@ -215,49 +220,45 @@ impl MutTxId { table.with_mut_schema(|s| s.schedule.as_mut().unwrap().schedule_id = id); } - // Insert constraints into `st_constraints` - for constraint in table_schema.constraints.iter().cloned() { - self.create_constraint(constraint)?; - } - - // Insert sequences into `st_sequences` - for seq in table_schema.sequences { - self.create_sequence(seq)?; - } - - // Create the indexes for the table - for index in table_schema.indexes { + // Create the indexes for the table. + for index in indices { let col_set = ColSet::from(index.index_algorithm.columns()); - let is_unique = table_schema - .constraints - .iter() - .any(|c| c.data.unique_columns() == Some(&col_set)); + let is_unique = constraints.iter().any(|c| c.data.unique_columns() == Some(&col_set)); self.create_index(index, is_unique)?; } - log::trace!("TABLE CREATED: {}, table_id: {table_id}", table_schema.table_name); + // Insert constraints into `st_constraints`. + for constraint in constraints { + self.create_constraint(constraint)?; + } + + // Insert sequences into `st_sequences`. + for seq in sequences { + self.create_sequence(seq)?; + } + + log::trace!("TABLE CREATED: {}, table_id: {table_id}", table_name); Ok(table_id) } fn create_table_internal(&mut self, schema: Arc) { - self.tx_state - .insert_tables - .insert(schema.table_id, Table::new(schema, SquashedOffset::TX_STATE)); + // Construct the in memory tables. + let table_id = schema.table_id; + let commit_table = Table::new(schema, SquashedOffset::COMMITTED_STATE); + let tx_table = commit_table.clone_structure(SquashedOffset::TX_STATE); + + // Add them to the committed and tx states. + self.committed_state_write_lock.tables.insert(table_id, commit_table); + self.tx_state.insert_tables.insert(table_id, tx_table); + + // Record that the committed state table is pending. + self.push_schema_change(PendingSchemaChange::TableAdded(table_id)); } fn get_row_type(&self, table_id: TableId) -> Option<&ProductType> { - if let Some(row_type) = self - .tx_state - .insert_tables - .get(&table_id) - .map(|table| table.get_row_type()) - { - return Some(row_type); - } self.committed_state_write_lock - .tables - .get(&table_id) + .get_table(table_id) .map(|table| table.get_row_type()) } @@ -268,6 +269,10 @@ impl MutTxId { return Ok(RowTypeForTable::Ref(row_type)); } + // TODO(centril): if the table exists, this is now dead code, + // as we will immediately insert a table into the committed state upon creation. + // So simplify this and merge with `get_row_type`. + // // Look up the columns for the table in question. // NOTE: This is quite an expensive operation, although we only need // to do this in situations where there is not currently an in memory @@ -293,11 +298,11 @@ impl MutTxId { } // Drop the table and their columns - self.drop_col_eq(ST_TABLE_ID, StTableFields::TableId.col_id(), &table_id.into())?; - self.drop_col_eq(ST_COLUMN_ID, StColumnFields::TableId.col_id(), &table_id.into())?; + self.delete_col_eq(ST_TABLE_ID, StTableFields::TableId.col_id(), &table_id.into())?; + self.delete_col_eq(ST_COLUMN_ID, StColumnFields::TableId.col_id(), &table_id.into())?; if let Some(schedule) = &schema.schedule { - self.drop_col_eq( + self.delete_col_eq( ST_SCHEDULED_ID, StScheduledFields::ScheduleId.col_id(), &schedule.schedule_id.into(), @@ -305,20 +310,25 @@ impl MutTxId { } // Delete the table and its rows and indexes from memory. - // TODO: This needs to not remove it from the committed state, because it can still be rolled back. - // We will have to store the deletion in the TxState and then apply it to the CommittedState in commit. + self.tx_state.insert_tables.remove(&table_id); + self.tx_state.delete_tables.remove(&table_id); + let commit_table = self + .committed_state_write_lock + .tables + .remove(&table_id) + .expect("there should be a schema in the committed state if we reach here"); + self.push_schema_change(PendingSchemaChange::TableRemoved(table_id, commit_table)); - // NOT use unwrap - self.committed_state_write_lock.tables.remove(&table_id); Ok(()) } + // TODO(centril): remove this. It doesn't seem to be used by anything. pub fn rename_table(&mut self, table_id: TableId, new_name: &str) -> Result<()> { // Update the table's name in st_tables. self.update_st_table_row(table_id, |st| st.table_name = new_name.into()) } - fn update_st_table_row(&mut self, table_id: TableId, updater: impl FnOnce(&mut StTableRow)) -> Result<()> { + fn update_st_table_row(&mut self, table_id: TableId, updater: impl FnOnce(&mut StTableRow) -> R) -> Result { // Fetch the row. let st_table_ref = self .iter_by_col_eq(ST_TABLE_ID, StTableFields::TableId, &table_id.into())? @@ -329,10 +339,10 @@ impl MutTxId { // Delete the row, run updates, and insert again. self.delete(ST_TABLE_ID, ptr)?; - updater(&mut row); + let ret = updater(&mut row); self.insert_via_serialize_bsatn(ST_TABLE_ID, &row)?; - Ok(()) + Ok(ret) } pub fn table_id_from_name(&self, table_name: &str) -> Result> { @@ -349,7 +359,7 @@ impl MutTxId { } /// Retrieves or creates the insert tx table for `table_id`. - #[allow(clippy::type_complexity)] + #[allow(clippy::type_complexity, clippy::unnecessary_lazy_evaluations)] fn get_or_create_insert_table_mut( &mut self, table_id: TableId, @@ -357,34 +367,34 @@ impl MutTxId { &mut Table, &mut dyn BlobStore, &mut IndexIdMap, - Option<&Table>, - &HashMapBlobStore, + &mut Table, + &mut dyn BlobStore, )> { - let commit_table = self.committed_state_write_lock.get_table(table_id); + let (commit_table, commit_bs, idx_map) = self.committed_state_write_lock.get_table_and_blob_store_mut(table_id); + let commit_table = commit_table.ok_or_else(|| TableError::IdNotFoundState(table_id))?; // Get the insert table, so we can write the row into it. - self.tx_state - .get_table_and_blob_store_or_maybe_create_from(table_id, commit_table) - .ok_or_else(|| TableError::IdNotFoundState(table_id).into()) - .map(|(tx, bs, idx_map, _)| { - ( - tx, - bs, - idx_map, - commit_table, - &self.committed_state_write_lock.blob_store, - ) - }) + let (tx, bs, ..) = self + .tx_state + .get_table_and_blob_store_or_create_from(table_id, commit_table); + Ok((tx, bs, idx_map, commit_table, commit_bs)) } +} +impl MutTxId { /// Set the table access of `table_id` to `access`. pub(crate) fn alter_table_access(&mut self, table_id: TableId, access: StAccess) -> Result<()> { // Write to the table in the tx state. - let (table, ..) = self.get_or_create_insert_table_mut(table_id)?; - table.with_mut_schema(|s| s.table_access = access); + let (tx_table, _, _, commit_table, _) = self.get_or_create_insert_table_mut(table_id)?; + tx_table.with_mut_schema(|s| s.table_access = access); + commit_table.with_mut_schema(|s| s.table_access = access); // Update system tables. - self.update_st_table_row(table_id, |st| st.table_access = access)?; + let old_access = self.update_st_table_row(table_id, |st| mem::replace(&mut st.table_access, access))?; + + // Remember the pending change so we can undo if necessary. + self.push_schema_change(PendingSchemaChange::TableAlterAccess(table_id, old_access)); + Ok(()) } @@ -434,39 +444,34 @@ impl MutTxId { let (table, blob_store, idx_map, commit_table, commit_blob_store) = self.get_or_create_insert_table_mut(table_id)?; - // Create and build the index. - // - // Ensure adding the index does not cause a unique constraint violation due to - // the existing rows having the same value for some column(s). - let mut insert_index = table.new_index(&index.index_algorithm, is_unique)?; - let mut build_from_rows = |table: &Table, bs: &dyn BlobStore| -> Result<()> { - let rows = table.scan_rows(bs); - // SAFETY: (1) `insert_index` was derived from `table` - // which in turn was derived from `commit_table`. - let violation = unsafe { insert_index.build_from_rows(rows) }; - if let Err(violation) = violation { - let violation = table - .get_row_ref(bs, violation) - .expect("row came from scanning the table") - .project(&insert_index.indexed_columns) - .expect("`cols` should consist of valid columns for this table"); - return Err(IndexError::from(table.build_error_unique(&insert_index, index_id, violation)).into()); - } - Ok(()) + // Create and build the indices. + let map_violation = |violation, index: &TableIndex, table: &Table, bs: &dyn BlobStore| { + let violation = table + .get_row_ref(bs, violation) + .expect("row came from scanning the table") + .project(&index.indexed_columns) + .expect("`cols` should consist of valid columns for this table"); + IndexError::from(table.build_error_unique(index, index_id, violation)).into() }; - build_from_rows(table, blob_store)?; - // NOTE: Also add all the rows in the already committed table to the index. - // - // FIXME: Is this correct? Index scan iterators (incl. the existing `Locking` versions) - // appear to assume that a table's index refers only to rows within that table, - // and does not handle the case where a `TxState` index refers to `CommittedState` rows. - // - // TODO(centril): An alternative here is to actually add this index to `CommittedState`, - // pretending that it was already committed, and recording this pretense. - // Then, we can roll that back on a failed tx. - if let Some(commit_table) = commit_table { - build_from_rows(commit_table, commit_blob_store)?; - } + // Builds the index and ensures that `table`'s row won't cause a unique constraint violation + // due to the existing rows having the same value for some column(s). + let build_from_rows = |index: &mut TableIndex, table: &Table, bs: &dyn BlobStore| -> Result<()> { + let rows = table.scan_rows(bs); + // SAFETY: (1) `tx_index` / `commit_index` was derived from `table` / `commit_table` + // which in turn was derived from `commit_table`. + let violation = unsafe { index.build_from_rows(rows) }; + violation.map_err(|v| map_violation(v, index, table, bs)) + }; + // Build the tx index. + let mut tx_index = table.new_index(&index.index_algorithm, is_unique)?; + build_from_rows(&mut tx_index, table, blob_store)?; + // Build the commit index. + let mut commit_index = tx_index.clone_structure(); + build_from_rows(&mut commit_index, commit_table, commit_blob_store)?; + // Make sure the two indices can be merged. + commit_index + .can_merge(&tx_index) + .map_err(|v| map_violation(v, &commit_index, commit_table, commit_blob_store))?; log::trace!( "INDEX CREATED: {} for table: {} and algorithm: {:?}", @@ -475,14 +480,17 @@ impl MutTxId { index.index_algorithm ); - // SAFETY: same as (1). - unsafe { table.add_index(index_id, insert_index) }; // Associate `index_id -> table_id` for fast lookup. idx_map.insert(index_id, table_id); - + // SAFETY: same as (1). + unsafe { table.add_index(index_id, tx_index) }; + let pointer_map = unsafe { commit_table.add_index(index_id, commit_index) }; // Update the table's schema. // This won't clone-write when creating a table but likely to otherwise. - table.with_mut_schema(|s| s.indexes.push(index)); + table.with_mut_schema(|s| s.indexes.push(index.clone())); + commit_table.with_mut_schema(|s| s.indexes.push(index)); + // Note the index in pending schema changes. + self.push_schema_change(PendingSchemaChange::IndexAdded(table_id, index_id, pointer_map)); Ok(index_id) } @@ -501,18 +509,21 @@ impl MutTxId { // Remove the index from st_indexes. self.delete(ST_INDEX_ID, st_index_ptr)?; - // Remove the index in the transaction's insert table. - // By altering the insert table, this gets moved over to the committed state on merge. - let (table, blob_store, idx_map, ..) = self.get_or_create_insert_table_mut(table_id)?; - assert!(table.delete_index(blob_store, index_id)); - // Remove the `index_id -> (table_id, col_list)` association from tx state. + // Remove the index in the transaction's insert table and the commit table. + let (tx_table, tx_bs, idx_map, commit_table, commit_bs) = self.get_or_create_insert_table_mut(table_id)?; + tx_table.delete_index(tx_bs, index_id, None); + let (commit_index, index_schema) = commit_table + .delete_index(commit_bs, index_id, None) + .expect("there should be a schema in the committed state if we reach here"); + // Remove the `index_id -> (table_id, col_list)` association. idx_map.remove(&index_id); - // Queue the deletion of the index in the committed state. - // Note that the index could have been added in this tx. - self.tx_state - .index_id_map_removals - .get_or_insert_default() - .insert(index_id); + // Note the index in pending schema changes. + self.push_schema_change(PendingSchemaChange::IndexRemoved( + table_id, + index_id, + commit_index, + index_schema, + )); log::trace!("INDEX DROPPED: {}", index_id); Ok(()) @@ -539,13 +550,11 @@ impl MutTxId { rend: &[u8], ) -> Result<(TableId, IndexScanRanged<'a>)> { // Extract the table id, and commit/tx indices. - let (table_id, commit_index, tx_index) = self.get_table_and_index(index_id); - // Extract the index type and make sure we have a table id. - let (index_ty, table_id) = commit_index - .or(tx_index) - .map(|index| &index.index().key_type) - .zip(table_id) + let (table_id, commit_index, tx_index) = self + .get_table_and_index(index_id) .ok_or_else(|| IndexError::NotFound(index_id))?; + // Extract the index type. + let index_ty = &commit_index.index().key_type; // TODO(centril): Once we have more index types than range-compatible ones, // we'll need to enforce that `index_id` refers to a range-compatible index. @@ -556,74 +565,32 @@ impl MutTxId { // Get an index seek iterator for the tx and committed state. let tx_iter = tx_index.map(|i| i.seek_range(&bounds)); - let commit_iter = commit_index.map(|i| i.seek_range(&bounds)); + let commit_iter = commit_index.seek_range(&bounds); - // Chain together the indexed rows in the tx and committed state, - // but don't yield rows deleted in the tx state. - use itertools::Either::*; - use IndexScanRangedInner::*; - let commit_iter = commit_iter.map(|iter| match self.tx_state.get_delete_table(table_id) { - None => Left(iter), - Some(deletes) => Right(IndexScanFilterDeleted { iter, deletes }), - }); - // This is effectively just `tx_iter.into_iter().flatten().chain(commit_iter.into_iter().flatten())`, - // but with all the branching and `Option`s flattened to just one layer. - let iter = match (tx_iter, commit_iter) { - (None, None) => Empty(iter::empty()), - (Some(tx_iter), None) => TxOnly(tx_iter), - (None, Some(Left(commit_iter))) => CommitOnly(commit_iter), - (None, Some(Right(commit_iter))) => CommitOnlyWithDeletes(commit_iter), - (Some(tx_iter), Some(Left(commit_iter))) => Both(tx_iter.chain(commit_iter)), - (Some(tx_iter), Some(Right(commit_iter))) => BothWithDeletes(tx_iter.chain(commit_iter)), - }; - Ok((table_id, IndexScanRanged { inner: iter })) + let dt = self.tx_state.get_delete_table(table_id); + let iter = combine_range_index_iters(dt, tx_iter, commit_iter); + Ok((table_id, iter)) } /// Translate `index_id` to the table id, and commit/tx indices. fn get_table_and_index( &self, index_id: IndexId, - ) -> (Option, Option>, Option>) { - // The order of querying the committed vs. tx state for the translation is not important. - // But it is vastly more likely that it is in the committed state, - // so query that first to avoid two lookups. - // - // Also, the tx state must have the index. - // If the index was e.g., dropped from the tx state but exists physically in the committed state, - // the index does not exist, semantically. - // TODO: handle the case where the table has been dropped in this transaction. - let commit_table_id = self + ) -> Option<(TableId, TableAndIndex<'_>, Option>)> { + // The hierarchy is as follows: + // 1. The table exists. + // 2. The commit index exists. + // 3. The tx index exists. + let table_id = self.committed_state_write_lock.get_table_for_index(index_id)?; + + // Index found for commit state, might also exist for tx state. + let commit_index = self .committed_state_write_lock - .get_table_for_index(index_id) - .filter(|_| !self.tx_state_removed_index(index_id)); + .get_index_by_id_with_table(table_id, index_id)?; - let (table_id, commit_index, tx_index) = if let t_id @ Some(table_id) = commit_table_id { - // Index found for commit state, might also exist for tx state. - let commit_index = self - .committed_state_write_lock - .get_index_by_id_with_table(table_id, index_id); - let tx_index = self.tx_state.get_index_by_id_with_table(table_id, index_id); - (t_id, commit_index, tx_index) - } else if let t_id @ Some(table_id) = self.tx_state.get_table_for_index(index_id) { - // Index might exist for tx state. - let tx_index = self.tx_state.get_index_by_id_with_table(table_id, index_id); - (t_id, None, tx_index) - } else { - // No index in either side. - (None, None, None) - }; - (table_id, commit_index, tx_index) - } + let tx_index = self.tx_state.get_index_by_id_with_table(table_id, index_id); - /// Returns whether the index with `index_id` was removed in this transaction. - /// - /// An index removed in the tx state but existing physically in the committed state - /// does not exist semantically. - fn tx_state_removed_index(&self, index_id: IndexId) -> bool { - self.tx_state - .index_id_map_removals - .as_ref() - .is_some_and(|s| s.contains(&index_id)) + Some((table_id, commit_index, tx_index)) } /// Decode the bounds for a ranged index scan for an index typed at `key_type`. @@ -724,57 +691,79 @@ impl MutTxId { (range_start, range_end) } - fn get_sequence_mut(&mut self, seq_id: SequenceId) -> Result<&mut Sequence> { - self.sequence_state_lock - .get_sequence_mut(seq_id) - .ok_or_else(|| SequenceError::NotFound(seq_id).into()) - } - pub fn get_next_sequence_value(&mut self, seq_id: SequenceId) -> Result { - { - let sequence = self.get_sequence_mut(seq_id)?; + get_next_sequence_value( + &mut self.tx_state, + &self.committed_state_write_lock, + &mut self.sequence_state_lock, + seq_id, + ) + } +} - // If there are allocated sequence values, return the new value. - // `gen_next_value` internally checks that the new allocation is acceptable, - // i.e. is less than or equal to the allocation amount. - // Note that on restart we start one after the allocation amount. - if let Some(value) = sequence.gen_next_value() { - return Ok(value); - } +fn get_sequence_mut(seq_state: &mut SequencesState, seq_id: SequenceId) -> Result<&mut Sequence> { + seq_state + .get_sequence_mut(seq_id) + .ok_or_else(|| SequenceError::NotFound(seq_id).into()) +} + +fn get_next_sequence_value( + tx_state: &mut TxState, + committed_state: &CommittedState, + seq_state: &mut SequencesState, + seq_id: SequenceId, +) -> Result { + { + let sequence = get_sequence_mut(seq_state, seq_id)?; + + // If there are allocated sequence values, return the new value. + // `gen_next_value` internally checks that the new allocation is acceptable, + // i.e. is less than or equal to the allocation amount. + // Note that on restart we start one after the allocation amount. + if let Some(value) = sequence.gen_next_value() { + return Ok(value); } - // Allocate new sequence values - // If we're out of allocations, then update the sequence row in st_sequences to allocate a fresh batch of sequences. - let old_seq_row_ref = self - .iter_by_col_eq(ST_SEQUENCE_ID, StSequenceFields::SequenceId, &seq_id.into())? - .last() - .unwrap(); - let old_seq_row_ptr = old_seq_row_ref.pointer(); - let seq_row = { - let mut seq_row = StSequenceRow::try_from(old_seq_row_ref)?; - - let sequence = self.get_sequence_mut(seq_id)?; - seq_row.allocated = sequence.nth_value(SEQUENCE_ALLOCATION_STEP as usize); - sequence.set_allocation(seq_row.allocated); - seq_row - }; - - self.delete(ST_SEQUENCE_ID, old_seq_row_ptr)?; - // `insert::` rather than `GENERATE = true` because: - // - We have already checked unique constraints during `create_sequence`. - // - Similarly, we have already applied autoinc sequences. - // - We do not want to apply autoinc sequences again, - // since the system table sequence `seq_st_table_table_id_primary_key_auto` - // has ID 0, and would otherwise trigger autoinc. - with_sys_table_buf(|buf| { - to_writer(buf, &seq_row).unwrap(); - self.insert::(ST_SEQUENCE_ID, buf) - })?; - - self.get_sequence_mut(seq_id)? - .gen_next_value() - .ok_or_else(|| SequenceError::UnableToAllocate(seq_id).into()) } + // Allocate new sequence values + // If we're out of allocations, then update the sequence row in st_sequences to allocate a fresh batch of sequences. + let old_seq_row_ref = iter_by_col_eq( + tx_state, + committed_state, + ST_SEQUENCE_ID, + StSequenceFields::SequenceId, + &seq_id.into(), + )? + .last() + .unwrap(); + let old_seq_row_ptr = old_seq_row_ref.pointer(); + let seq_row = { + let mut seq_row = StSequenceRow::try_from(old_seq_row_ref)?; + + let sequence = get_sequence_mut(seq_state, seq_id)?; + seq_row.allocated = sequence.nth_value(SEQUENCE_ALLOCATION_STEP as usize); + sequence.set_allocation(seq_row.allocated); + seq_row + }; + + delete(tx_state, committed_state, ST_SEQUENCE_ID, old_seq_row_ptr)?; + // `insert::` rather than `GENERATE = true` because: + // - We have already checked unique constraints during `create_sequence`. + // - Similarly, we have already applied autoinc sequences. + // - We do not want to apply autoinc sequences again, + // since the system table sequence `seq_st_table_table_id_primary_key_auto` + // has ID 0, and would otherwise trigger autoinc. + with_sys_table_buf(|buf| { + to_writer(buf, &seq_row).unwrap(); + insert::(tx_state, committed_state, seq_state, ST_SEQUENCE_ID, buf) + })?; + + get_sequence_mut(seq_state, seq_id)? + .gen_next_value() + .ok_or_else(|| SequenceError::UnableToAllocate(seq_id).into()) +} + +impl MutTxId { /// Create a sequence. /// Requires: /// - `seq.sequence_id == SequenceId::SENTINEL` @@ -819,10 +808,12 @@ impl MutTxId { sequence_row.sequence_id = seq_id; let schema: SequenceSchema = sequence_row.into(); - self.get_insert_table_mut(schema.table_id)? - // This won't clone-write when creating a table but likely to otherwise. - .with_mut_schema(|s| s.update_sequence(schema.clone())); - self.sequence_state_lock.insert(seq_id, Sequence::new(schema)); + let (tx_table, _, _, commit_table, _) = self.get_or_create_insert_table_mut(table_id)?; + // This won't clone-write when creating a table but likely to otherwise. + tx_table.with_mut_schema(|s| s.update_sequence(schema.clone())); + commit_table.with_mut_schema(|s| s.update_sequence(schema.clone())); + self.sequence_state_lock.insert(Sequence::new(schema)); + self.push_schema_change(PendingSchemaChange::SequenceAdded(table_id, seq_id)); log::trace!("SEQUENCE CREATED: id = {}", seq_id); @@ -830,24 +821,30 @@ impl MutTxId { } pub fn drop_sequence(&mut self, sequence_id: SequenceId) -> Result<()> { + // Ensure the sequence exists. let st_sequence_ref = self .iter_by_col_eq(ST_SEQUENCE_ID, StSequenceFields::SequenceId, &sequence_id.into())? .next() .ok_or_else(|| TableError::IdNotFound(SystemTable::st_sequence, sequence_id.into()))?; let table_id = st_sequence_ref.read_col(StSequenceFields::TableId)?; + // Delete from system tables. self.delete(ST_SEQUENCE_ID, st_sequence_ref.pointer())?; - // TODO: Transactionality. - // Currently, a TX which drops a sequence then aborts - // will leave the sequence deleted, - // rather than restoring it during rollback. - self.sequence_state_lock.remove(sequence_id); - if let Some((insert_table, _)) = self.tx_state.get_table_and_blob_store(table_id) { - // This likely will do a clone-write as over time? - // The schema might have found other referents. - insert_table.with_mut_schema(|s| s.remove_sequence(sequence_id)); - } + // Drop the sequence from in-memory tables. + let sequence = self + .sequence_state_lock + .remove(sequence_id) + .expect("there should be a sequence in the committed state if we reach here"); + let (tx_table, _, _, commit_table, _) = self.get_or_create_insert_table_mut(table_id)?; + // This likely will do a clone-write as over time? + // The schema might have found other referents. + tx_table.with_mut_schema(|s| s.remove_sequence(sequence_id)); + let schema = commit_table + .with_mut_schema(|s| s.remove_sequence(sequence_id)) + .expect("there should be a schema in the committed state if we reach here"); + self.push_schema_change(PendingSchemaChange::SequenceRemoved(table_id, sequence, schema)); + Ok(()) } @@ -893,9 +890,9 @@ impl MutTxId { constraint.data ); - // Insert the constraint row into st_constraint - // NOTE: Because st_constraint has a unique index on constraint_name, this will - // fail if the table already exists. + // Insert the constraint row into `st_constraint`. + // NOTE: Because `st_constraint` has a unique index on constraint_name, + // this will fail if the table already exists. let constraint_row = StConstraintRow { table_id, constraint_id: ConstraintId::SENTINEL, @@ -905,28 +902,20 @@ impl MutTxId { let constraint_row = self.insert_via_serialize_bsatn(ST_CONSTRAINT_ID, &constraint_row)?; let constraint_id = constraint_row.1.collapse().read_col(StConstraintFields::ConstraintId)?; - let existed = matches!(constraint_row.1, RowRefInsertion::Existed(_)); - // TODO: Can we return early here? - - let (table, ..) = self.get_or_create_insert_table_mut(table_id)?; - constraint.constraint_id = constraint_id; - // This won't clone-write when creating a table but likely to otherwise. - table.with_mut_schema(|s| s.update_constraint(constraint)); - - if existed { + if let RowRefInsertion::Existed(_) = constraint_row.1 { log::trace!("CONSTRAINT ALREADY EXISTS: {constraint_id}"); - } else { - log::trace!("CONSTRAINT CREATED: {constraint_id}"); + return Ok(constraint_id); } - Ok(constraint_id) - } + let (tx_table, _, _, commit_table, _) = self.get_or_create_insert_table_mut(table_id)?; + constraint.constraint_id = constraint_id; + // This won't clone-write when creating a table but likely to otherwise. + tx_table.with_mut_schema(|s| s.update_constraint(constraint.clone())); + commit_table.with_mut_schema(|s| s.update_constraint(constraint)); + self.push_schema_change(PendingSchemaChange::ConstraintAdded(table_id, constraint_id)); - fn get_insert_table_mut(&mut self, table_id: TableId) -> Result<&mut Table> { - self.tx_state - .get_table_and_blob_store(table_id) - .map(|(tbl, _)| tbl) - .ok_or_else(|| TableError::IdNotFoundState(table_id).into()) + log::trace!("CONSTRAINT CREATED: {constraint_id}"); + Ok(constraint_id) } pub fn drop_constraint(&mut self, constraint_id: ConstraintId) -> Result<()> { @@ -943,12 +932,19 @@ impl MutTxId { self.delete(ST_CONSTRAINT_ID, st_constraint_ref.pointer())?; // Remove constraint in transaction's insert table. - let (table, ..) = self.get_or_create_insert_table_mut(table_id)?; + let (tx_table, _, _, commit_table, _) = self.get_or_create_insert_table_mut(table_id)?; // This likely will do a clone-write as over time? // The schema might have found other referents. - table.with_mut_schema(|s| s.remove_constraint(constraint_id)); + tx_table.with_mut_schema(|s| s.remove_constraint(constraint_id)); + let schema = commit_table + .with_mut_schema(|s| s.remove_constraint(constraint_id)) + .expect("there should be a schema in the committed state if we reach here"); + self.push_schema_change(PendingSchemaChange::ConstraintRemoved(table_id, schema)); // TODO(1.0): we should also re-initialize `table` without a unique constraint. // unless some other unique constraint on the same columns exists. + // NOTE(centril): is this already handled by dropping the corresponding index? + // Probably not in the case where an index + // with the same name goes from being unique to not unique. Ok(()) } @@ -1148,7 +1144,6 @@ impl MutTxId { ctx: self.ctx, metrics: ExecutionMetrics::default(), }; - (tx_data, tx_metrics, tx) } @@ -1157,7 +1152,10 @@ impl MutTxId { /// Returns: /// - [`TxMetrics`], various measurements of the work performed by this transaction. /// - `String`, the name of the reducer which ran during this transaction. - pub fn rollback(self) -> (TxMetrics, String) { + pub fn rollback(mut self) -> (TxMetrics, String) { + self.committed_state_write_lock + .rollback(&mut self.sequence_state_lock, self.tx_state); + // Compute and keep enough info that we can // record metrics after the transaction has ended // and after the lock has been dropped. @@ -1183,6 +1181,9 @@ impl MutTxId { /// - [`TxMetrics`], various measurements of the work performed by this transaction. /// - [`TxId`], a read-only transaction with a shared lock on the committed state. pub fn rollback_downgrade(mut self, workload: Workload) -> (TxMetrics, TxId) { + self.committed_state_write_lock + .rollback(&mut self.sequence_state_lock, self.tx_state); + // Compute and keep enough info that we can // record metrics after the transaction has ended // and after the lock has been dropped. @@ -1235,17 +1236,15 @@ pub struct IndexScanRanged<'a> { } enum IndexScanRangedInner<'a> { - Empty(iter::Empty>), - TxOnly(IndexScanRangeIter<'a>), CommitOnly(IndexScanRangeIter<'a>), - CommitOnlyWithDeletes(IndexScanFilterDeleted<'a>), + CommitOnlyWithDeletes(FilterDeleted<'a, IndexScanRangeIter<'a>>), Both(iter::Chain, IndexScanRangeIter<'a>>), - BothWithDeletes(iter::Chain, IndexScanFilterDeleted<'a>>), + BothWithDeletes(iter::Chain, FilterDeleted<'a, IndexScanRangeIter<'a>>>), } -struct IndexScanFilterDeleted<'a> { - iter: IndexScanRangeIter<'a>, - deletes: &'a DeleteTable, +pub(super) struct FilterDeleted<'a, I> { + pub(super) iter: I, + pub(super) deletes: &'a DeleteTable, } impl<'a> Iterator for IndexScanRanged<'a> { @@ -1253,8 +1252,6 @@ impl<'a> Iterator for IndexScanRanged<'a> { fn next(&mut self) -> Option { match &mut self.inner { - IndexScanRangedInner::Empty(it) => it.next(), - IndexScanRangedInner::TxOnly(it) => it.next(), IndexScanRangedInner::CommitOnly(it) => it.next(), IndexScanRangedInner::CommitOnlyWithDeletes(it) => it.next(), IndexScanRangedInner::Both(it) => it.next(), @@ -1263,7 +1260,7 @@ impl<'a> Iterator for IndexScanRanged<'a> { } } -impl<'a> Iterator for IndexScanFilterDeleted<'a> { +impl<'a, I: Iterator>> Iterator for FilterDeleted<'a, I> { type Item = RowRef<'a>; fn next(&mut self) -> Option { self.iter.find(|row| !self.deletes.contains(row.pointer())) @@ -1292,6 +1289,8 @@ impl MutTxId { if let Some(ptr) = self .iter_by_col_eq( ST_CLIENT_ID, + // TODO(perf, minor, centril): consider a `const_col_list([x, ..])` + // so we know this is not computed at runtime. col_list![StClientFields::Identity, StClientFields::ConnectionId], &AlgebraicValue::product(row), )? @@ -1340,178 +1339,230 @@ impl MutTxId { table_id: TableId, row: &[u8], ) -> Result<(ColList, RowRefInsertion<'_>, InsertFlags)> { - // Get the insert table, so we can write the row into it. - let (tx_table, tx_blob_store, ..) = self - .tx_state - .get_table_and_blob_store_or_maybe_create_from( - table_id, - self.committed_state_write_lock.get_table(table_id), - ) - .ok_or(TableError::IdNotFoundState(table_id))?; - - let insert_flags = InsertFlags { - is_scheduler_table: tx_table.is_scheduler(), - }; - - // 1. Insert the physical row. - let page_pool = &mut self.committed_state_write_lock.page_pool; - let (tx_row_ref, blob_bytes) = tx_table.insert_physically_bsatn(page_pool, tx_blob_store, row)?; - // 2. Optionally: Detect, generate, write sequence values. - // 3. Confirm that the insertion respects constraints and update statistics. - // 4. Post condition (PC.INS.1): - // `res = Ok((hash, ptr))` - // => `ptr` refers to a valid row in `table_id` for `tx_table` - // ∧ `hash` is the hash of this row - // This follows from both `if/else` branches leading to `confirm_insertion` - // which both entail the above post-condition. - let ((tx_table, tx_blob_store, delete_table), gen_cols, res) = if GENERATE { - // When `GENERATE` is enabled, we're instructed to deal with sequence value generation. - // Collect all the columns with sequences that need generation. - let tx_row_ptr = tx_row_ref.pointer(); - let (cols_to_gen, seqs_to_use) = unsafe { tx_table.sequence_triggers_for(tx_blob_store, tx_row_ptr) }; - - // Generate a value for every column in the row that needs it. - let mut seq_vals: SmallVec<[i128; 1]> = <_>::default(); - for sequence_id in seqs_to_use { - seq_vals.push(self.get_next_sequence_value(sequence_id)?); - } - - // Write the generated values to the physical row at `tx_row_ptr`. - // We assume here that column with a sequence is of a sequence-compatible type. - // SAFETY: By virtue of `get_table_and_blob_store_or_maybe_create_from` above succeeding, - // we can assume we have an insert and delete table. - let (tx_table, tx_blob_store, delete_table) = - unsafe { self.tx_state.assume_present_get_mut_table(table_id) }; - for (col_id, seq_val) in cols_to_gen.iter().zip(seq_vals) { - // SAFETY: - // - `self.is_row_present(row)` holds as we haven't deleted the row. - // - `col_id` is a valid column, and has a sequence, so it must have a primitive type. - unsafe { tx_table.write_gen_val_to_col(col_id, tx_row_ptr, seq_val) }; - } - - // `CHECK_SAME_ROW = true`, as there might be an identical row already in the tx state. - // SAFETY: `self.is_row_present(row)` holds as we still haven't deleted the row, - // in particular, the `write_gen_val_to_col` call does not remove the row. - let res = unsafe { tx_table.confirm_insertion::(tx_blob_store, tx_row_ptr, blob_bytes) }; - ((tx_table, tx_blob_store, delete_table), cols_to_gen, res) - } else { - // When `GENERATE` is not enabled, simply confirm the insertion. - // This branch is hit when inside sequence generation itself, to avoid infinite recursion. - let tx_row_ptr = tx_row_ref.pointer(); - // `CHECK_SAME_ROW = true`, as there might be an identical row already in the tx state. - // SAFETY: `self.is_row_present(row)` holds as we just inserted the row. - let res = unsafe { tx_table.confirm_insertion::(tx_blob_store, tx_row_ptr, blob_bytes) }; - // SAFETY: By virtue of `get_table_and_blob_store_or_maybe_create_from` above succeeding, - // we can assume we have an insert and delete table. - ( - unsafe { self.tx_state.assume_present_get_mut_table(table_id) }, - ColList::empty(), - res, - ) - }; - - match res { - Ok((tx_row_hash, tx_row_ptr)) => { - if let Some(commit_table) = self.committed_state_write_lock.get_table(table_id) { - // The `tx_row_ref` was not previously present in insert tables, - // but may still be a set-semantic conflict - // or may violate a unique constraint with a row in the committed state. - // We'll check the set-semantic aspect in (1) and the constraint in (2). - - // (1) Rule out a set-semantic conflict with the committed state. - // SAFETY: - // - `commit_table` and `tx_table` use the same schema - // because `tx_table` is derived from `commit_table`. - // - `tx_row_ptr` is correct per (PC.INS.1). - if let (_, Some(commit_ptr)) = - unsafe { Table::find_same_row(commit_table, tx_table, tx_blob_store, tx_row_ptr, tx_row_hash) } - { - // (insert_undelete) - // ----------------------------------------------------- - // If `row` was already present in the committed state, - // either this is a set-semantic duplicate, - // or the row is marked as deleted, so we will undelete it - // and leave it in the committed state. - // Either way, it should not appear in the insert tables, - // so roll back the insertion. - // - // NOTE for future MVCC implementors: - // In MVCC, it is no longer valid to elide inserts in this way. - // When a transaction inserts a row, that row *must* appear in its insert tables, - // even if the row is already present in the committed state. - // - // Imagine a chain of committed but un-squashed transactions: - // `Committed 0: Insert Row A` - `Committed 1: Delete Row A` - // where `Committed 1` happens after `Committed 0`. - // Imagine a transaction `Running 2: Insert Row A`, - // which began before `Committed 1` was committed. - // Because `Committed 1` has since been committed, - // `Running 2` *must* happen after `Committed 1`. - // Therefore, the correct sequence of events is: - // - Insert Row A - // - Delete Row A - // - Insert Row A - // This is impossible to recover if `Running 2` elides its insert. - tx_table - .delete(tx_blob_store, tx_row_ptr, |_| ()) - .expect("Failed to delete a row we just inserted"); - - // It's possible that `row` appears in the committed state, - // but is marked as deleted. - // In this case, undelete it, so it remains in the committed state. - delete_table.remove(commit_ptr); - - // No new row was inserted, but return `committed_ptr`. - let blob_store = &self.committed_state_write_lock.blob_store; - let rri = RowRefInsertion::Existed( - // SAFETY: `find_same_row` told us that `ptr` refers to a valid row in `commit_table`. - unsafe { commit_table.get_row_ref_unchecked(blob_store, commit_ptr) }, - ); - return Ok((gen_cols, rri, insert_flags)); - } - - // Pacify the borrow checker. - // SAFETY: `tx_row_ptr` is still correct for `tx_table` per (PC.INS.1). - // as there haven't been any interleaving `&mut` calls that could invalidate the pointer. - let tx_row_ref = unsafe { tx_table.get_row_ref_unchecked(tx_blob_store, tx_row_ptr) }; - - // (2) The `tx_row_ref` did not violate a unique constraint *within* the `tx_table`, - // but it could do so wrt., `commit_table`, - // assuming the conflicting row hasn't been deleted since. - // Ensure that it doesn't, or roll back the insertion. - let is_deleted = |commit_ptr| delete_table.contains(commit_ptr); - // SAFETY: `commit_table.row_layout() == tx_row_ref.row_layout()` holds - // as the `tx_table` is derived from `commit_table`. - let res = unsafe { commit_table.check_unique_constraints(tx_row_ref, |ixs| ixs, is_deleted) }; - if let Err(e) = res { - // There was a constraint violation, so undo the insertion. - tx_table.delete(tx_blob_store, tx_row_ptr, |_| {}); - return Err(IndexError::from(e).into()); - } - } - - let rri = RowRefInsertion::Inserted(unsafe { - // SAFETY: `tx_row_ptr` is still correct for `tx_table` per (PC.INS.1). - // as there haven't been any interleaving `&mut` calls that could invalidate the pointer. - tx_table.get_row_ref_unchecked(tx_blob_store, tx_row_ptr) - }); - Ok((gen_cols, rri, insert_flags)) - } - // `row` previously present in insert tables; do nothing but return `ptr`. - Err(InsertError::Duplicate(DuplicateError(ptr))) => { - let rri = RowRefInsertion::Existed( - // SAFETY: `tx_table` told us that `ptr` refers to a valid row in it. - unsafe { tx_table.get_row_ref_unchecked(tx_blob_store, ptr) }, - ); - Ok((gen_cols, rri, insert_flags)) - } - - // Unwrap these error into `TableError::{IndexError, Bflatn}`: - Err(InsertError::IndexError(e)) => Err(IndexError::from(e).into()), - Err(InsertError::Bflatn(e)) => Err(TableError::Bflatn(e).into()), - } + insert::( + &mut self.tx_state, + &self.committed_state_write_lock, + &mut self.sequence_state_lock, + table_id, + row, + ) } +} +/// Insert a row, encoded in BSATN, into a table. +/// +/// Zero placeholders, i.e., sequence triggers, +/// in auto-inc columns in the new row will be replaced with generated values +/// if and only if `GENERATE` is true. +/// This method is called with `GENERATE` false when updating the `st_sequence` system table. +/// +/// Requires: +/// - `table_id` must refer to a valid table for the database at `database_identity`. +/// - `row` must be a valid row for the table at `table_id`. +/// +/// Returns: +/// - list of columns which have been replaced with generated values. +/// - a pointer to the inserted row. +/// - the number of bytes added to the tx blob store. +/// - The "tx table for insertion" for further processing. +/// - The "commit table for insertion" for further processing. +fn insert_physically_maybe_generate<'a, const GENERATE: bool>( + tx_state: &'a mut TxState, + committed_state: &'a CommittedState, + seq_state: &mut SequencesState, + table_id: TableId, + row: &[u8], +) -> Result<( + RowPointer, + ColList, + BlobNumBytes, + TxTableForInsertion<'a>, + CommitTableForInsertion<'a>, +)> { + // Get commit table and friends. + let commit_parts = committed_state.get_table_and_blob_store(table_id)?; + let (commit_table, ..) = commit_parts; + + // Get the insert table, so we can write the row into it. + let (tx_table, tx_blob_store, _) = tx_state.get_table_and_blob_store_or_create_from(table_id, commit_table); + + // 1. Insert the physical row. + let page_pool = &committed_state.page_pool; + let (tx_row_ref, blob_bytes) = tx_table.insert_physically_bsatn(page_pool, tx_blob_store, row)?; + let tx_row_ptr = tx_row_ref.pointer(); + // 2. Optionally: Detect, generate, write sequence values. + let (tx_parts, gen_cols) = if GENERATE { + // When `GENERATE` is enabled, we're instructed to deal with sequence value generation. + // Collect all the columns with sequences that need generation. + let (cols_to_gen, seqs_to_use) = unsafe { tx_table.sequence_triggers_for(tx_blob_store, tx_row_ptr) }; + + // Generate a value for every column in the row that needs it. + let mut seq_vals: SmallVec<[i128; 1]> = <_>::default(); + for sequence_id in seqs_to_use { + seq_vals.push(get_next_sequence_value( + tx_state, + committed_state, + seq_state, + sequence_id, + )?); + } + + // Write the generated values to the physical row at `tx_row_ptr`. + // We assume here that column with a sequence is of a sequence-compatible type. + // SAFETY: After `get_table_and_blob_store_or_create_from` there's a insert and delete table. + let (tx_table, tx_blob_store, delete_table) = unsafe { tx_state.assume_present_get_mut_table(table_id) }; + for (col_id, seq_val) in cols_to_gen.iter().zip(seq_vals) { + // SAFETY: + // - `self.is_row_present(row)` holds as we haven't deleted the row. + // - `col_id` is a valid column, and has a sequence, so it must have a primitive type. + unsafe { tx_table.write_gen_val_to_col(col_id, tx_row_ptr, seq_val) }; + } + + ((tx_table, tx_blob_store, delete_table), cols_to_gen) + } else { + // When `GENERATE` is not enabled, avoid sequence generation. + // This branch is hit when inside sequence generation itself, to avoid infinite recursion. + // SAFETY: After `get_table_and_blob_store_or_create_from` there's a insert and delete table. + let tx_parts = unsafe { tx_state.assume_present_get_mut_table(table_id) }; + (tx_parts, ColList::empty()) + }; + + Ok((tx_row_ptr, gen_cols, blob_bytes, tx_parts, commit_parts)) +} + +/// Insert a row, encoded in BSATN, into a table. +/// +/// Zero placeholders, i.e., sequence triggers, +/// in auto-inc columns in the new row will be replaced with generated values +/// if and only if `GENERATE` is true. +/// This method is called with `GENERATE` false when updating the `st_sequence` system table. +/// +/// Requires: +/// - `table_id` must refer to a valid table for the database at `database_identity`. +/// - `row` must be a valid row for the table at `table_id`. +/// +/// Returns: +/// - a list of columns which have been replaced with generated values. +/// - a ref to the inserted row. +/// - any insert flags. +pub(super) fn insert<'a, const GENERATE: bool>( + tx_state: &'a mut TxState, + committed_state: &'a CommittedState, + seq_state: &mut SequencesState, + table_id: TableId, + row: &[u8], +) -> Result<(ColList, RowRefInsertion<'a>, InsertFlags)> { + let ( + tx_row_ptr, + gen_cols, + blob_bytes, + (tx_table, tx_blob_store, delete_table), + (commit_table, commit_blob_store, _), + ) = insert_physically_maybe_generate::(tx_state, committed_state, seq_state, table_id, row)?; + + let insert_flags = InsertFlags { + is_scheduler_table: tx_table.is_scheduler(), + }; + let ok = |row_ref| Ok((gen_cols, row_ref, insert_flags)); + + // `CHECK_SAME_ROW = true`, as there might be an identical row already in the tx state. + // SAFETY: `tx_table.is_row_present(row)` holds as we still haven't deleted the row, + // in particular, the `write_gen_val_to_col` call does not remove the row. + let res = unsafe { tx_table.confirm_insertion::(tx_blob_store, tx_row_ptr, blob_bytes) }; + + match res { + Ok((tx_row_hash, tx_row_ptr)) => { + // The `tx_row_ref` was not previously present in insert tables, + // but may still be a set-semantic conflict + // or may violate a unique constraint with a row in the committed state. + // We'll check the set-semantic aspect in (1) and the constraint in (2). + + // (1) Rule out a set-semantic conflict with the committed state. + // SAFETY: + // - `commit_table` and `tx_table` use the same schema + // because `tx_table` is derived from `commit_table`. + // - `tx_row_ptr` is correct per post-condition of `tx_table.confirm_insertion(...)`. + if let (_, Some(commit_ptr)) = + unsafe { Table::find_same_row(commit_table, tx_table, tx_blob_store, tx_row_ptr, tx_row_hash) } + { + // (insert_undelete) + // ----------------------------------------------------- + // If `row` was already present in the committed state, + // either this is a set-semantic duplicate, + // or the row is marked as deleted, so we will undelete it + // and leave it in the committed state. + // Either way, it should not appear in the insert tables, + // so roll back the insertion. + // + // NOTE for future MVCC implementors: + // In MVCC, it is no longer valid to elide inserts in this way. + // When a transaction inserts a row, that row *must* appear in its insert tables, + // even if the row is already present in the committed state. + // + // Imagine a chain of committed but un-squashed transactions: + // `Committed 0: Insert Row A` - `Committed 1: Delete Row A` + // where `Committed 1` happens after `Committed 0`. + // Imagine a transaction `Running 2: Insert Row A`, + // which began before `Committed 1` was committed. + // Because `Committed 1` has since been committed, + // `Running 2` *must* happen after `Committed 1`. + // Therefore, the correct sequence of events is: + // - Insert Row A + // - Delete Row A + // - Insert Row A + // This is impossible to recover if `Running 2` elides its insert. + tx_table + .delete(tx_blob_store, tx_row_ptr, |_| ()) + .expect("Failed to delete a row we just inserted"); + + // It's possible that `row` appears in the committed state, + // but is marked as deleted. + // In this case, undelete it, so it remains in the committed state. + delete_table.remove(commit_ptr); + + // No new row was inserted, but return `committed_ptr`. + // SAFETY: `find_same_row` told us that `ptr` refers to a valid row in `commit_table`. + let row_ref = unsafe { commit_table.get_row_ref_unchecked(commit_blob_store, commit_ptr) }; + return ok(RowRefInsertion::Existed(row_ref)); + } + + // Pacify the borrow checker. + // SAFETY: `tx_row_ptr` is still correct for `tx_table` per (PC.INS.1). + // as there haven't been any interleaving `&mut` calls that could invalidate the pointer. + let tx_row_ref = unsafe { tx_table.get_row_ref_unchecked(tx_blob_store, tx_row_ptr) }; + + // (2) The `tx_row_ref` did not violate a unique constraint *within* the `tx_table`, + // but it could do so wrt., `commit_table`, + // assuming the conflicting row hasn't been deleted since. + // Ensure that it doesn't, or roll back the insertion. + let is_deleted = |commit_ptr| delete_table.contains(commit_ptr); + // SAFETY: `commit_table.row_layout() == tx_row_ref.row_layout()` holds + // as the `tx_table` is derived from `commit_table`. + let res = unsafe { commit_table.check_unique_constraints(tx_row_ref, |ixs| ixs, is_deleted) }; + if let Err(e) = res { + // There was a constraint violation, so undo the insertion. + tx_table.delete(tx_blob_store, tx_row_ptr, |_| {}); + return Err(IndexError::from(e).into()); + } + + // SAFETY: `tx_row_ptr` is still correct for `tx_table` per (PC.INS.1). + // as there haven't been any interleaving `&mut` calls that could invalidate the pointer. + let row_ref = unsafe { tx_table.get_row_ref_unchecked(tx_blob_store, tx_row_ptr) }; + ok(RowRefInsertion::Inserted(row_ref)) + } + // `row` previously present in insert tables; do nothing but return `ptr`. + Err(InsertError::Duplicate(DuplicateError(ptr))) => { + // SAFETY: `tx_table` told us that `ptr` refers to a valid row in it. + let row_ref = unsafe { tx_table.get_row_ref_unchecked(tx_blob_store, ptr) }; + ok(RowRefInsertion::Existed(row_ref)) + } + // Unwrap these error into `TableError::{IndexError, Bflatn}`: + Err(InsertError::IndexError(e)) => Err(IndexError::from(e).into()), + Err(InsertError::Bflatn(e)) => Err(TableError::Bflatn(e).into()), + } +} + +impl MutTxId { /// Update a row, encoded in BSATN, into a table. /// /// Zero placeholders, i.e., sequence triggers, @@ -1533,147 +1584,109 @@ impl MutTxId { table_id: TableId, index_id: IndexId, row: &[u8], - ) -> Result<(ColList, RowRef<'_>, UpdateFlags)> { - let tx_removed_index = self.tx_state_removed_index(index_id); - - // 1. Insert the physical row into the tx insert table. - //---------------------------------------------------------------------- + ) -> Result<(ColList, RowRefInsertion<'_>, UpdateFlags)> { + // Insert the physical row into the tx insert table + // and possibly generate sequence values. + // // As we are provided the `row` encoded in BSATN, // and since we don't have a convenient way to BSATN to a set of columns, // we cannot really do an in-place update in the row-was-in-tx-state case. // So we will begin instead by inserting the row physically to the tx state and project that. - let (tx_table, tx_blob_store, ..) = self - .tx_state - .get_table_and_blob_store_or_maybe_create_from( - table_id, - self.committed_state_write_lock.get_table(table_id), - ) - .ok_or(TableError::IdNotFoundState(table_id))?; - let page_pool = &mut self.committed_state_write_lock.page_pool; - let (tx_row_ref, blob_bytes) = tx_table.insert_physically_bsatn(page_pool, tx_blob_store, row)?; - - // 2. Detect, generate, write sequence values in the new row. - //---------------------------------------------------------------------- - // Unlike the `fn insert(...)` case, this is not conditional on a `GENERATE` flag. - // Collect all the columns with sequences that need generation. - let tx_row_ptr = tx_row_ref.pointer(); - let (cols_to_gen, seqs_to_use) = unsafe { tx_table.sequence_triggers_for(tx_blob_store, tx_row_ptr) }; - // Generate a value for every column in the row that needs it. - let mut seq_vals: SmallVec<[i128; 1]> = <_>::default(); - for sequence_id in seqs_to_use { - seq_vals.push(self.get_next_sequence_value(sequence_id)?); - } - // Write the generated values to the physical row at `tx_row_ptr`. - // We assume here that column with a sequence is of a sequence-compatible type. - // SAFETY: By virtue of `get_table_and_blob_store_or_maybe_create_from` above succeeding, - // we can assume we have an insert and delete table. - let (tx_table, tx_blob_store, del_table) = unsafe { self.tx_state.assume_present_get_mut_table(table_id) }; - for (col_id, seq_val) in cols_to_gen.iter().zip(seq_vals) { - // SAFETY: - // - `self.is_row_present(row)` holds as we haven't deleted the row. - // - `col_id` is a valid column, and has a sequence, so it must have a primitive type. - unsafe { tx_table.write_gen_val_to_col(col_id, tx_row_ptr, seq_val) }; - } - // SAFETY: `tx_table.is_row_present(tx_row_ptr)` holds as we haven't deleted it yet. - let tx_row_ref = unsafe { tx_table.get_row_ref_unchecked(tx_blob_store, tx_row_ptr) }; + let ( + tx_row_ptr, + cols_to_gen, + blob_bytes, + (tx_table, tx_blob_store, del_table), + (commit_table, commit_blob_store, _), + ) = insert_physically_maybe_generate::( + &mut self.tx_state, + &self.committed_state_write_lock, + &mut self.sequence_state_lock, + table_id, + row, + )?; let update_flags = UpdateFlags { is_scheduler_table: tx_table.is_scheduler(), }; + let ok = |row_ref| Ok((cols_to_gen, row_ref, update_flags)); - // 3. Find the old row and remove it. - //---------------------------------------------------------------------- - #[inline] - fn ensure_unique(index_id: IndexId, index: TableAndIndex<'_>) -> Result<()> { - if !index.index().is_unique() { - return Err(IndexError::NotUnique(index_id).into()); + // SAFETY: `tx_table.is_row_present(tx_row_ptr)` holds as we just inserted it. + let tx_row_ref = unsafe { tx_table.get_row_ref_unchecked(tx_blob_store, tx_row_ptr) }; + + let err = 'error: { + // These two macros can be thought of as a `throw $e` and `$e?` within `'error`. + // TODO(centril): Get rid of this once we have stable `try` blocks or polonius. + macro_rules! throw { + ($e:expr) => { + break 'error $e.into() + }; } - Ok(()) - } - /// Ensure that the new row does not violate the commit table's unique constraints. - #[inline] - fn check_commit_unique_constraints( - commit_table: &Table, - del_table: &DeleteTable, - ignore_index_id: IndexId, - new_row: RowRef<'_>, - old_ptr: RowPointer, - ) -> Result<()> { - let is_deleted = |commit_ptr| commit_ptr == old_ptr || del_table.contains(commit_ptr); + macro_rules! unwrap { + ($e:expr) => { + match $e { + Ok(x) => x, + Err(e) => throw!(e), + } + }; + } + + // Check that the index exists and is unique. + // It's sufficient to check the committed state. + let Some(commit_index) = commit_table.get_index_by_id(index_id) else { + throw!(IndexError::NotFound(index_id)); + }; + if !commit_index.is_unique() { + throw!(IndexError::NotUnique(index_id)); + } + + // Project the row to the index's type. + // SAFETY: `tx_row_ref`'s table is derived from `commit_index`'s table, + // so all `index.indexed_columns` will be in-bounds of the row layout. + let index_key = unsafe { tx_row_ref.project_unchecked(&commit_index.indexed_columns) }; + + // Try to find the old row first in the committed state using the `index_key`. + let mut old_commit_del_ptr = None; + let commit_old_ptr = commit_index.seek_point(&index_key).next().filter(|&ptr| { + // Was committed row previously deleted in this TX? + let deleted = del_table.contains(ptr); + // If so, remember it in case it was identical to the new row. + old_commit_del_ptr = deleted.then_some(ptr); + !deleted + }); + + // Ensure that the new row does not violate other commit table unique constraints. + let is_deleted = |commit_ptr| { + commit_old_ptr.is_some_and(|old_ptr| old_ptr == commit_ptr) || del_table.contains(commit_ptr) + }; // SAFETY: `commit_table.row_layout() == new_row.row_layout()` holds // as the `tx_table` is derived from `commit_table`. - let res = unsafe { + unwrap!(unsafe { commit_table.check_unique_constraints( - new_row, + tx_row_ref, // Don't check this index since we'll do a 1-1 old/new replacement. - |ixs| ixs.filter(|(&id, _)| id != ignore_index_id), + |ixs| ixs.filter(|(&id, _)| id != index_id), is_deleted, ) - }; - res.map_err(IndexError::from).map_err(Into::into) - } - /// Projects the new row to the index to find the old row. - #[inline] - fn find_old_row(new_row: RowRef<'_>, index: TableAndIndex<'_>) -> (Option, AlgebraicValue) { - let index = index.index(); - // Project the row to the index's columns/type. - // SAFETY: `new_row` belongs to the same table as `index`, - // so all `index.indexed_columns` will be in-bounds of the row layout. - let needle = unsafe { new_row.project_unchecked(&index.indexed_columns) }; - // Find the old row. - (index.seek_point(&needle).next(), needle) - } + } + .map_err(IndexError::from)); - // The index we've been directed to use must exist - // either in the committed state or in the tx state. - // In the former case, the index must not have been removed in the transaction. - // As it's unlikely that an index was added in this transaction, - // we begin by checking the committed state. - let commit_blob_store = &self.committed_state_write_lock.blob_store; - let mut old_commit_del_ptr = None; - let err = 'failed_rev_ins: { - let tx_row_ptr = if tx_removed_index { - break 'failed_rev_ins IndexError::NotFound(index_id).into(); - } else if let Some((commit_index, old_ptr)) = - // Find the committed state index, project the row to it, and find the old row. - // The old row must not have been deleted. + let tx_row_ptr = if let Some(old_ptr) = commit_old_ptr { + // Row was found in the committed state! // - // If the old row wasn't found, it may still exist in the tx state, - // which inherits the index structure of the committed state, - // so we'd like to avoid an early error in that case. - self - .committed_state_write_lock - .get_index_by_id_with_table(table_id, index_id) - .and_then(|index| find_old_row(tx_row_ref, index).0.map(|ptr| (index, ptr))) - .filter(|&(_, ptr)| { - // Was committed row previously deleted in this TX? - let deleted = del_table.contains(ptr); - // If so, remember it in case it was identical to the new row. - old_commit_del_ptr = deleted.then_some(ptr); - !deleted - }) - { - // 1. Ensure the index is unique. - // 2. Ensure the new row doesn't violate any other committed state unique indices. - let commit_table = commit_index.table(); - if let Err(e) = ensure_unique(index_id, commit_index).and_then(|_| { - check_commit_unique_constraints(commit_table, del_table, index_id, tx_row_ref, old_ptr) - }) { - break 'failed_rev_ins e; - } - // If the new row is the same as the old, // skip the update altogether to match the semantics of `Self::insert`. + // // SAFETY: // 1. `tx_table` is derived from `commit_table` so they have the same layouts. // 2. `old_ptr` was found in an index of `commit_table`, so we know it is valid. // 3. we just inserted `tx_row_ptr` into `tx_table`, so we know it is valid. if unsafe { Table::eq_row_in_page(commit_table, old_ptr, tx_table, tx_row_ptr) } { - // SAFETY: `self.is_row_present(tx_row_ptr)` holds, as noted in 3. + // SAFETY: `tx_table.is_row_present(tx_row_ptr)` holds, as noted in 3. unsafe { tx_table.delete_internal_skip_pointer_map(tx_blob_store, tx_row_ptr) }; // SAFETY: `commit_table.is_row_present(old_ptr)` holds, as noted in 2. - let old_row_ref = unsafe { commit_table.get_row_ref_unchecked(commit_blob_store, old_ptr) }; - return Ok((cols_to_gen, old_row_ref, update_flags)); + let row_ref = unsafe { commit_table.get_row_ref_unchecked(commit_blob_store, old_ptr) }; + return ok(RowRefInsertion::Existed(row_ref)); } // Check constraints and confirm the insertion of the new row. @@ -1686,236 +1699,164 @@ impl MutTxId { // but it cannot, as the committed state already has `X` for `C`. // So we don't need to check the tx state for a duplicate row. // - // SAFETY: `self.is_row_present(row)` holds as we still haven't deleted the row, + // SAFETY: `tx_table.is_row_present(row)` holds as we still haven't deleted the row, // in particular, the `write_gen_val_to_col` call does not remove the row. // On error, `tx_row_ptr` has already been removed, so don't do it again. let (_, tx_row_ptr) = - unsafe { tx_table.confirm_insertion::(tx_blob_store, tx_row_ptr, blob_bytes) }?; + unwrap!(unsafe { tx_table.confirm_insertion::(tx_blob_store, tx_row_ptr, blob_bytes) }); + // Delete the old row. del_table.insert(old_ptr); tx_row_ptr - } else if let Some(tx_index) = - // Either the row was not found in the committed state index, - // or the index was added in our tx state. - // In the latter case, committed state rows will be present in the index, - // so we must handle those specially. - tx_table.get_index_by_id_with_table(tx_blob_store, index_id) + } else if let Some(old_ptr) = tx_table + .get_index_by_id(index_id) + .and_then(|index| index.seek_point(&index_key).next()) { - // 0. Find the old row. - // 1. Ensure the index is unique. - // 2. Ensure the new row doesn't violate any other committed state unique indices. - let (old_ptr, needle) = find_old_row(tx_row_ref, tx_index); - let commit_table = self.committed_state_write_lock.get_table(table_id); - let res = old_ptr - // If we have an old committed state row, ensure it hasn't been deleted in our tx. - .filter(|ptr| ptr.squashed_offset() == SquashedOffset::TX_STATE || !del_table.contains(*ptr)) - .ok_or_else(|| IndexError::KeyNotFound(index_id, needle).into()) - .and_then(|old_ptr| { - ensure_unique(index_id, tx_index)?; - if let Some(commit_table) = commit_table { - check_commit_unique_constraints(commit_table, del_table, index_id, tx_row_ref, old_ptr)?; - } - Ok(old_ptr) - }); - let old_ptr = match res { - Err(e) => break 'failed_rev_ins e, - Ok(x) => x, - }; + // Row was found in the tx state! + // + // Check constraints and confirm the update of the new row. + // This ensures that the old row is removed from the indices + // before attempting to insert the new row into the indices. + // + // SAFETY: `tx_table.is_row_present(tx_row_ptr)` and `tx_table.is_row_present(old_ptr)` both hold + // as we've deleted neither. + // In particular, the `write_gen_val_to_col` call does not remove the row. + let tx_row_ptr = + unwrap!(unsafe { tx_table.confirm_update(tx_blob_store, tx_row_ptr, old_ptr, blob_bytes) }); - match old_ptr.squashed_offset() { - SquashedOffset::COMMITTED_STATE => { - if let Some(commit_table) = commit_table { - // If the new row is the same as the old, - // skip the update altogether to match the semantics of `Self::insert`. - // SAFETY: - // 1. `tx_table` is derived from `commit_table` so they have the same layouts. - // 2. `old_ptr` was found in an index of `tx_table`, - // but we had `SquashedOffset::COMMITTED_STATE`, - // so we know it is valid for `commit_table`. - // 3. we just inserted `tx_row_ptr` into `tx_table`, so we know it is valid. - if unsafe { Table::eq_row_in_page(commit_table, old_ptr, tx_table, tx_row_ptr) } { - // SAFETY: `self.is_row_present(tx_row_ptr)` holds, as noted in 3. - unsafe { tx_table.delete_internal_skip_pointer_map(tx_blob_store, tx_row_ptr) }; - // SAFETY: `commit_table.is_row_present(old_ptr)` holds, as noted in 2. - let old_row_ref = - unsafe { commit_table.get_row_ref_unchecked(commit_blob_store, old_ptr) }; - return Ok((cols_to_gen, old_row_ref, update_flags)); - } - } + if let Some(old_commit_del_ptr) = old_commit_del_ptr { + // If we have an identical deleted row in the committed state, + // we need to undelete it, just like in `Self::insert`. + // The same note (`insert_undelete`) there re. MVCC applies here as well. + // + // SAFETY: + // 1. `tx_table` is derived from `commit_table` so they have the same layouts. + // 2. `old_commit_del_ptr` was found in an index of `commit_table`. + // 3. we just inserted `tx_row_ptr` into `tx_table`, so we know it is valid. + if unsafe { Table::eq_row_in_page(commit_table, old_commit_del_ptr, tx_table, tx_row_ptr) } { + // It is important that we `confirm_update` first, + // as we must ensure that undeleting the row causes no tx state conflict. + tx_table + .delete(tx_blob_store, tx_row_ptr, |_| ()) + .expect("Failed to delete a row we just inserted"); - // Check constraints and confirm the insertion of the new row. - // - // `CHECK_SAME_ROW = false`, - // as we know there's a row (`old_ptr`) in the committed state with, - // for columns `C`, a unique value X. - // For `row` to be identical to another row in the tx state, - // it must have the value `X` for `C`, - // but it cannot, as the committed state already has `X` for `C`. - // So we don't need to check the tx state for a duplicate row. - // - // SAFETY: `self.is_row_present(row)` holds as we still haven't deleted the row, - // in particular, the `write_gen_val_to_col` call does not remove the row. - // On error, `tx_row_ptr` has already been removed, so don't do it again. - let (_, tx_row_ptr) = - unsafe { tx_table.confirm_insertion::(tx_blob_store, tx_row_ptr, blob_bytes) }?; - // Delete the old row. - del_table.insert(old_ptr); - tx_row_ptr + // Undelete. + del_table.remove(old_commit_del_ptr); + + // Return the undeleted committed state row. + // SAFETY: `commit_table.is_row_present(old_commit_del_ptr)` holds. + let row_ref = + unsafe { commit_table.get_row_ref_unchecked(commit_blob_store, old_commit_del_ptr) }; + return ok(RowRefInsertion::Existed(row_ref)); } - SquashedOffset::TX_STATE => { - // Check constraints and confirm the update of the new row. - // This ensures that the old row is removed from the indices - // before attempting to insert the new row into the indices. - // - // SAFETY: `self.is_row_present(tx_row_ptr)` and `self.is_row_present(old_ptr)` both hold - // as we've deleted neither. - // In particular, the `write_gen_val_to_col` call does not remove the row. - let tx_row_ptr = - unsafe { tx_table.confirm_update(tx_blob_store, tx_row_ptr, old_ptr, blob_bytes) }?; - - if let Some(old_commit_del_ptr) = old_commit_del_ptr { - let commit_table = - commit_table.expect("previously found a row in `commit_table`, so there should be one"); - // If we have an identical deleted row in the committed state, - // we need to undeleted it, just like in `Self::insert`. - // The same note (`insert_undelete`) there re. MVCC applies here as well. - // - // SAFETY: - // 1. `tx_table` is derived from `commit_table` so they have the same layouts. - // 2. `old_commit_del_ptr` was found in an index of `commit_table`. - // 3. we just inserted `tx_row_ptr` into `tx_table`, so we know it is valid. - if unsafe { Table::eq_row_in_page(commit_table, old_commit_del_ptr, tx_table, tx_row_ptr) } - { - // It is important that we `confirm_update` first, - // as we must ensure that undeleting the row causes no tx state conflict. - tx_table - .delete(tx_blob_store, tx_row_ptr, |_| ()) - .expect("Failed to delete a row we just inserted"); - - // Undelete. - del_table.remove(old_commit_del_ptr); - - // Return the undeleted committed state row. - // SAFETY: `commit_table.is_row_present(old_commit_del_ptr)` holds. - let old_row_ref = unsafe { - commit_table.get_row_ref_unchecked(commit_blob_store, old_commit_del_ptr) - }; - return Ok((cols_to_gen, old_row_ref, update_flags)); - } - } - - tx_row_ptr - } - _ => unreachable!("Invalid SquashedOffset for RowPointer: {:?}", old_ptr), } + + tx_row_ptr } else { - break 'failed_rev_ins IndexError::NotFound(index_id).into(); + throw!(IndexError::KeyNotFound(index_id, index_key)); }; // SAFETY: `tx_table.is_row_present(tx_row_ptr)` holds // per post-condition of `confirm_insertion` and `confirm_update` // in the if/else branches respectively. - let tx_row_ref = unsafe { tx_table.get_row_ref_unchecked(tx_blob_store, tx_row_ptr) }; - return Ok((cols_to_gen, tx_row_ref, update_flags)); + let row_ref = unsafe { tx_table.get_row_ref_unchecked(tx_blob_store, tx_row_ptr) }; + return ok(RowRefInsertion::Inserted(row_ref)); }; // When we reach here, we had an error and we need to revert the insertion of `tx_row_ref`. - // SAFETY: `self.is_row_present(tx_row_ptr)` holds, + // SAFETY: `tx_table.is_row_present(tx_row_ptr)` holds, // as we still haven't deleted the row physically. unsafe { tx_table.delete_internal_skip_pointer_map(tx_blob_store, tx_row_ptr) }; Err(err) } pub(super) fn delete(&mut self, table_id: TableId, row_pointer: RowPointer) -> Result { - match row_pointer.squashed_offset() { - // For newly-inserted rows, - // just delete them from the insert tables - // - there's no reason to have them in both the insert and delete tables. - SquashedOffset::TX_STATE => { - let (table, blob_store) = self - .tx_state - .get_table_and_blob_store(table_id) - .ok_or(TableError::IdNotFoundState(table_id))?; - Ok(table.delete(blob_store, row_pointer, |_| ()).is_some()) - } - SquashedOffset::COMMITTED_STATE => { - let commit_table = self - .committed_state_write_lock - .get_table(table_id) - .expect("there's a row in committed state so there should be a committed table"); - // NOTE: We trust the `row_pointer` refers to an extant row, - // and check only that it hasn't yet been deleted. - self.tx_state - .get_delete_table_mut(table_id, commit_table) - .insert(row_pointer); - Ok(true) - } - _ => unreachable!("Invalid SquashedOffset for RowPointer: {:?}", row_pointer), - } + delete( + &mut self.tx_state, + &self.committed_state_write_lock, + table_id, + row_pointer, + ) } +} +pub(super) fn delete( + tx_state: &mut TxState, + committed_state: &CommittedState, + table_id: TableId, + row_pointer: RowPointer, +) -> Result { + match row_pointer.squashed_offset() { + // For newly-inserted rows, + // just delete them from the insert tables + // - there's no reason to have them in both the insert and delete tables. + SquashedOffset::TX_STATE => { + let (table, blob_store) = tx_state + .get_table_and_blob_store(table_id) + .ok_or(TableError::IdNotFoundState(table_id))?; + Ok(table.delete(blob_store, row_pointer, |_| ()).is_some()) + } + SquashedOffset::COMMITTED_STATE => { + let commit_table = committed_state + .get_table(table_id) + .expect("there's a row in committed state so there should be a committed table"); + // NOTE: We trust the `row_pointer` refers to an extant row, + // and check only that it hasn't yet been deleted. + tx_state + .get_delete_table_mut(table_id, commit_table) + .insert(row_pointer); + Ok(true) + } + _ => unreachable!("Invalid SquashedOffset for RowPointer: {:?}", row_pointer), + } +} + +impl MutTxId { pub(super) fn delete_by_row_value(&mut self, table_id: TableId, rel: &ProductValue) -> Result { - // Four cases here: - // - Table exists in both tx_state and committed_state. - // - Temporary insert into tx_state. - // - If match exists in tx_state, delete it immediately. - // - Else if match exists in committed_state, add to delete tables. - // - Roll back temp insertion. - // - Table exists only in tx_state. - // - As above, but without else branch. - // - Table exists only in committed_state. - // - Create table in tx_state, then as above. - // - Table does not exist. - // - No such row; return false. + // Get commit table and page pool. + let page_pool = &self.committed_state_write_lock.page_pool; + let (commit_table, ..) = self.committed_state_write_lock.get_table_and_blob_store(table_id)?; - let (commit_table, page_pool) = self.committed_state_write_lock.get_table_mut(table_id); - - // If the tx table exists, get it. - // If it doesn't exist, but the commit table does, - // create the tx table using the commit table as a template. - let Some((tx_table, tx_blob_store, ..)) = self + // Temporarily insert the row into the tx insert table. + let (tx_table, tx_blob_store, _) = self .tx_state - .get_table_and_blob_store_or_maybe_create_from(table_id, commit_table.as_deref()) - else { - // If neither the committed table nor the tx table exists, - // the row can't exist, so delete nothing. - return Ok(false); - }; + .get_table_and_blob_store_or_create_from(table_id, commit_table); // We only want to physically insert the row here to get a row pointer. // We'd like to avoid any set semantic and unique constraint checks. let (row_ref, _) = tx_table.insert_physically_pv(page_pool, tx_blob_store, rel)?; let ptr = row_ref.pointer(); - // First, check if a matching row exists in the `tx_table`. - // If it does, no need to check the `commit_table`. + // First, check if a matching row exists in the `commit_table`. + // If it does, no need to check the `tx_table`. + // + // We start with `commit_table` as, in most cases, + // we'll likely have a transaction that deletes a committed row + // rather than deleting a row that was inserted in the same transaction. // // SAFETY: - // - `tx_table` trivially uses the same schema as itself. + // - `commit_table` and `tx_table` use the same schema. // - `ptr` is valid because we just inserted it. - // - `hash` is correct because we just computed it. - let (hash, to_delete) = unsafe { Table::find_same_row(tx_table, tx_table, tx_blob_store, ptr, None) }; + let (hash, to_delete) = unsafe { Table::find_same_row(commit_table, tx_table, tx_blob_store, ptr, None) }; let to_delete = to_delete - // Not present in insert tables? Check if present in the commit tables. + // Not present in commit table? Check if present in the tx table. .or_else(|| { - commit_table.and_then(|commit_table| { - // SAFETY: - // - `commit_table` and `tx_table` use the same schema - // - `ptr` is valid because we just inserted it. - let (_, to_delete) = - unsafe { Table::find_same_row(commit_table, tx_table, tx_blob_store, ptr, hash) }; - to_delete - }) + // SAFETY: + // - `commit_table` and `tx_table` use the same schema. + // - `ptr` is valid because we just inserted it. + let (_, to_delete) = unsafe { Table::find_same_row(tx_table, tx_table, tx_blob_store, ptr, hash) }; + to_delete }); - // Remove the temporary entry from the insert tables. - // Do this before actually deleting to drop the borrows on the tables. + // Remove the temporary entry from the tx table. + // Do this before actually deleting to drop the borrows on the table. // SAFETY: `ptr` is valid because we just inserted it and haven't deleted it since. unsafe { tx_table.delete_internal_skip_pointer_map(tx_blob_store, ptr); } - // Mark the committed row to be deleted by adding it to the delete table. + // Delete the found row either by marking (commit table) + // or by deleteing directly (tx table). to_delete .map(|to_delete| self.delete(table_id, to_delete)) .unwrap_or(Ok(false)) @@ -1934,34 +1875,18 @@ impl StateView for MutTxId { // TODO(bikeshedding, docs): should this also check if the schema is in the system tables, // but the table hasn't been constructed yet? // If not, document why. - self.tx_state - .insert_tables - .get(&table_id) - .or_else(|| self.committed_state_write_lock.tables.get(&table_id)) - .map(|table| table.get_schema()) + + // No need to check the tx state. + // If the table is not in the committed state, it doesn't exist. + self.committed_state_write_lock.get_schema(table_id) } fn table_row_count(&self, table_id: TableId) -> Option { - let commit_count = self.committed_state_write_lock.table_row_count(table_id); - let (tx_ins_count, tx_del_count) = self.tx_state.table_row_count(table_id); - let commit_count = commit_count.map(|cc| cc - tx_del_count); - // Keep track of whether `table_id` exists. - match (commit_count, tx_ins_count) { - (Some(cc), Some(ic)) => Some(cc + ic), - (Some(c), None) | (None, Some(c)) => Some(c), - (None, None) => None, - } + table_row_count(&self.tx_state, &self.committed_state_write_lock, table_id) } fn iter(&self, table_id: TableId) -> Result> { - if self.table_name(table_id).is_some() { - return Ok(IterMutTx::new( - table_id, - &self.tx_state, - &self.committed_state_write_lock, - )); - } - Err(TableError::IdNotFound(SystemTable::st_table, table_id.0).into()) + iter(&self.tx_state, &self.committed_state_write_lock, table_id) } fn iter_by_col_range>( @@ -1970,89 +1895,129 @@ impl StateView for MutTxId { cols: ColList, range: R, ) -> Result> { - // We have to index_seek in both the committed state and the current tx state. - // First, we will check modifications in the current tx. It may be that the table - // has not been modified yet in the current tx, in which case we will only search - // the committed state. Finally, the table may not be indexed at all, in which case - // we fall back to iterating the entire table. - // - // We need to check the tx_state first. In particular, it may be that the index - // was only added in the current transaction. - // TODO(george): It's unclear that we truly support dynamically creating an index - // yet. In particular, I don't know if creating an index in a transaction and - // rolling it back will leave the index in place. - if let Some(inserted_rows) = self.tx_state.index_seek_by_cols(table_id, &cols, &range) { - let committed_rows = self.committed_state_write_lock.index_seek(table_id, &cols, &range); - // The current transaction has modified this table, and the table is indexed. - Ok(if let Some(del_table) = self.tx_state.get_delete_table(table_id) { - IterByColRangeMutTx::IndexWithDeletes(IndexSeekIterIdWithDeletedMutTx { - inserted_rows, - committed_rows, - del_table, - }) - } else { - IterByColRangeMutTx::Index(IndexSeekIterIdMutTx { - inserted_rows, - committed_rows, - }) - }) - } else { - // Either the current transaction has not modified this table, or the table is not - // indexed. - match self.committed_state_write_lock.index_seek(table_id, &cols, &range) { - Some(committed_rows) => Ok(if let Some(del_table) = self.tx_state.get_delete_table(table_id) { - IterByColRangeMutTx::CommittedIndexWithDeletes(CommittedIndexIterWithDeletedMutTx::new( - committed_rows, - del_table, - )) - } else { - IterByColRangeMutTx::CommittedIndex(committed_rows) - }), - None => { - #[cfg(feature = "unindexed_iter_by_col_range_warn")] - match self.table_row_count(table_id) { - // TODO(ux): log these warnings to the module logs rather than host logs. - None => log::error!( - "iter_by_col_range on unindexed column, but couldn't fetch table `{table_id}`s row count", - ), - Some(num_rows) => { - const TOO_MANY_ROWS_FOR_SCAN: u64 = 1000; - if num_rows >= TOO_MANY_ROWS_FOR_SCAN { - let schema = self.schema_for_table(table_id).unwrap(); - let table_name = &schema.table_name; - let col_names = cols - .iter() - .map(|col_id| { - schema - .columns() - .get(col_id.idx()) - .map(|col| &col.col_name[..]) - .unwrap_or("[unknown column]") - }) - .collect::>(); - log::warn!( - "iter_by_col_range without index: table {table_name} has {num_rows} rows; scanning columns {col_names:?}", - ); - } - } - } - - Ok(IterByColRangeMutTx::Scan(ScanIterByColRangeMutTx::new( - self.iter(table_id)?, - cols, - range, - ))) - } - } - } + iter_by_col_range(&self.tx_state, &self.committed_state_write_lock, table_id, cols, range) } - fn iter_by_col_eq<'a, 'r>( - &'a self, + fn iter_by_col_eq<'r>( + &self, table_id: TableId, cols: impl Into, value: &'r AlgebraicValue, - ) -> Result> { - self.iter_by_col_range(table_id, cols.into(), value) + ) -> Result> { + iter_by_col_eq(&self.tx_state, &self.committed_state_write_lock, table_id, cols, value) + } +} + +fn table_row_count(tx_state: &TxState, committed_state: &CommittedState, table_id: TableId) -> Option { + let commit_count = committed_state.table_row_count(table_id); + let (tx_ins_count, tx_del_count) = tx_state.table_row_count(table_id); + let commit_count = commit_count.map(|cc| cc - tx_del_count); + // Keep track of whether `table_id` exists. + match (commit_count, tx_ins_count) { + (Some(cc), Some(ic)) => Some(cc + ic), + (Some(c), None) | (None, Some(c)) => Some(c), + (None, None) => None, + } +} + +fn iter<'a>(tx_state: &'a TxState, committed_state: &'a CommittedState, table_id: TableId) -> Result> { + IterMutTx::new(table_id, tx_state, committed_state) +} + +fn iter_by_col_range<'a, R: RangeBounds>( + tx_state: &'a TxState, + committed_state: &'a CommittedState, + table_id: TableId, + cols: ColList, + range: R, +) -> Result> { + // If there's an index, use that. + // It's sufficient to check that the committed state has an index + // as index schema changes are applied immediately. + if let Some(commit_iter) = committed_state.index_seek(table_id, &cols, &range) { + let tx_iter = tx_state.index_seek_by_cols(table_id, &cols, &range); + let delete_table = tx_state.get_delete_table(table_id); + let iter = combine_range_index_iters(delete_table, tx_iter, commit_iter); + Ok(IterByColRangeMutTx::Index(iter)) + } else { + unindexed_iter_by_col_range_warn(tx_state, committed_state, table_id, &cols); + let iter = iter(tx_state, committed_state, table_id)?; + + Ok(IterByColRangeMutTx::Scan(ScanIterByColRangeMutTx::new( + iter, cols, range, + ))) + } +} + +fn iter_by_col_eq<'a, 'r>( + tx_state: &'a TxState, + committed_state: &'a CommittedState, + table_id: TableId, + cols: impl Into, + value: &'r AlgebraicValue, +) -> Result> { + iter_by_col_range(tx_state, committed_state, table_id, cols.into(), value) +} + +fn combine_range_index_iters<'a>( + delete_table: Option<&'a DeleteTable>, + tx_iter: Option>, + commit_iter: IndexScanRangeIter<'a>, +) -> IndexScanRanged<'a> { + // Chain together the indexed rows in the tx and committed state, + // but don't yield rows deleted in the tx state. + use itertools::Either::*; + use IndexScanRangedInner::*; + let commit_iter = match delete_table { + None => Left(commit_iter), + Some(deletes) => Right(FilterDeleted { + iter: commit_iter, + deletes, + }), + }; + // This is effectively just `tx_iter.into_iter().flatten().chain(commit_iter)`, + // but with all the branching and `Option`s flattened to just one layer. + let iter = match (tx_iter, commit_iter) { + (None, Left(commit_iter)) => CommitOnly(commit_iter), + (None, Right(commit_iter)) => CommitOnlyWithDeletes(commit_iter), + (Some(tx_iter), Left(commit_iter)) => Both(tx_iter.chain(commit_iter)), + (Some(tx_iter), Right(commit_iter)) => BothWithDeletes(tx_iter.chain(commit_iter)), + }; + IndexScanRanged { inner: iter } +} + +#[cfg(not(feature = "unindexed_iter_by_col_range_warn"))] +fn unindexed_iter_by_col_range_warn(_: &TxState, _: &CommittedState, _: TableId, _: &ColList) {} + +#[cfg(feature = "unindexed_iter_by_col_range_warn")] +fn unindexed_iter_by_col_range_warn( + tx_state: &TxState, + committed_state: &CommittedState, + table_id: TableId, + cols: &ColList, +) { + match table_row_count(tx_state, committed_state, table_id) { + // TODO(ux): log these warnings to the module logs rather than host logs. + None => log::error!("iter_by_col_range on unindexed column, but couldn't fetch table `{table_id}`s row count",), + Some(num_rows) => { + const TOO_MANY_ROWS_FOR_SCAN: u64 = 1000; + if num_rows >= TOO_MANY_ROWS_FOR_SCAN { + let schema = committed_state.get_schema(table_id).unwrap(); + let table_name = &schema.table_name; + let col_names = cols + .iter() + .map(|col_id| { + schema + .columns() + .get(col_id.idx()) + .map(|col| &col.col_name[..]) + .unwrap_or("[unknown column]") + }) + .collect::>(); + log::warn!( + "iter_by_col_range without index: table {table_name} has {num_rows} rows; scanning columns {col_names:?}", + ); + } + } } } diff --git a/crates/core/src/db/datastore/locking_tx_datastore/sequence.rs b/crates/core/src/db/datastore/locking_tx_datastore/sequence.rs index 369780fe2c..705565ced2 100644 --- a/crates/core/src/db/datastore/locking_tx_datastore/sequence.rs +++ b/crates/core/src/db/datastore/locking_tx_datastore/sequence.rs @@ -123,8 +123,8 @@ impl SequencesState { self.sequences.get_mut(&seq_id) } - pub(super) fn insert(&mut self, seq_id: SequenceId, seq: Sequence) { - self.sequences.insert(seq_id, seq); + pub(super) fn insert(&mut self, seq: Sequence) { + self.sequences.insert(seq.id(), seq); } pub(super) fn remove(&mut self, seq_id: SequenceId) -> Option { diff --git a/crates/core/src/db/datastore/locking_tx_datastore/state_view.rs b/crates/core/src/db/datastore/locking_tx_datastore/state_view.rs index 32daa33bcd..f1d48afaab 100644 --- a/crates/core/src/db/datastore/locking_tx_datastore/state_view.rs +++ b/crates/core/src/db/datastore/locking_tx_datastore/state_view.rs @@ -1,5 +1,5 @@ -use super::{committed_state::CommittedState, datastore::Result, delete_table::DeleteTable, tx_state::TxState}; -use crate::db::datastore::locking_tx_datastore::committed_state::CommittedIndexIterWithDeletedMutTx; +use super::mut_tx::{FilterDeleted, IndexScanRanged}; +use super::{committed_state::CommittedState, datastore::Result, tx_state::TxState}; use crate::{ db::datastore::system_tables::{ StColumnFields, StColumnRow, StConstraintFields, StConstraintRow, StIndexFields, StIndexRow, StScheduledFields, @@ -157,35 +157,35 @@ pub struct IterMutTx<'a> { } impl<'a> IterMutTx<'a> { - pub(super) fn new(table_id: TableId, tx_state: &'a TxState, committed_state: &'a CommittedState) -> Self { + pub(super) fn new(table_id: TableId, tx_state: &'a TxState, committed_state: &'a CommittedState) -> Result { + // If the table exist, the committed state has it as we apply schema changes immediately. + let Some(commit_table) = committed_state.get_table(table_id) else { + return Err(TableError::IdNotFound(SystemTable::st_table, table_id.0).into()); + }; + + // I can neither confirm nor deny that we have a tx insert table. let tx_state_ins = tx_state .insert_tables .get(&table_id) .map(|table| (table, &tx_state.blob_store)); - let stage = if let Some(table) = committed_state.tables.get(&table_id) { - // The committed state has changes for this table. - let iter = table.scan_rows(&committed_state.blob_store); - if let Some(del_tables) = tx_state.get_delete_table(table_id) { - // There are deletes in the tx state - // so we must exclude those (1b). - ScanStage::CommittedWithTxDeletes { iter, del_tables } - } else { - // There are no deletes in the tx state - // so we don't need to care about those (1a). - ScanStage::CommittedNoTxDeletes { iter } - } + let iter = commit_table.scan_rows(&committed_state.blob_store); + let stage = if let Some(deletes) = tx_state.get_delete_table(table_id) { + // There are deletes in the tx state + // so we must exclude those (1b). + let iter = FilterDeleted { iter, deletes }; + ScanStage::CommittedWithTxDeletes { iter } } else { - ScanStage::Continue + // There are no deletes in the tx state + // so we don't need to care about those (1a). + ScanStage::CommittedNoTxDeletes { iter } }; - Self { tx_state_ins, stage } + Ok(Self { tx_state_ins, stage }) } } enum ScanStage<'a> { - /// Continue to the next stage. - Continue, /// Yielding rows from the current tx. CurrentTx { iter: TableScanIter<'a> }, /// Yielding rows from the committed state @@ -194,10 +194,7 @@ enum ScanStage<'a> { /// Yielding rows from the committed state /// but there are deleted rows in the tx state, /// so we must check against those. - CommittedWithTxDeletes { - iter: TableScanIter<'a>, - del_tables: &'a DeleteTable, - }, + CommittedWithTxDeletes { iter: FilterDeleted<'a, TableScanIter<'a>> }, } impl<'a> Iterator for IterMutTx<'a> { @@ -207,19 +204,11 @@ impl<'a> Iterator for IterMutTx<'a> { fn next(&mut self) -> Option { // The finite state machine goes: // - // Continue - // | - // |--> CurrentTx -------------------------------\ - // | ^ | - // | \--------------------\ | - // | ^ | - // |--> CommittedNoTxDeletes ----|---------------\ - // | ^ v - // \--> CommittedWithTxDeletes --|------/----> Stop - + // CommittedNoTxDeletes ------\ + // |----> CurrentTx ---> STOP + // CommittedWithTxDeletes ----/ loop { match &mut self.stage { - ScanStage::Continue => {} ScanStage::CommittedNoTxDeletes { iter } => { // (1a) Go through the committed state for this table // but do not consider deleted rows. @@ -227,7 +216,7 @@ impl<'a> Iterator for IterMutTx<'a> { return next; } } - ScanStage::CommittedWithTxDeletes { iter, del_tables } => { + ScanStage::CommittedWithTxDeletes { iter } => { // (1b) Check the committed row's state in the current tx. // If it's been deleted, skip it. // If it's still present, yield it. @@ -254,7 +243,7 @@ impl<'a> Iterator for IterMutTx<'a> { // // As a result, in MVCC, this branch will need to check if the `row_ref` // also exists in the `tx_state.insert_tables` and ensure it is yielded only once. - if let next @ Some(_) = iter.find(|row_ref| !del_tables.contains(row_ref.pointer())) { + if let next @ Some(_) = iter.next() { return next; } } @@ -298,70 +287,6 @@ impl<'a> Iterator for IterTx<'a> { } } -pub struct IndexSeekIterIdMutTx<'a> { - pub(super) inserted_rows: IndexScanRangeIter<'a>, - pub(super) committed_rows: Option>, -} - -impl<'a> Iterator for IndexSeekIterIdMutTx<'a> { - type Item = RowRef<'a>; - - fn next(&mut self) -> Option { - if let Some(row_ref) = self.inserted_rows.next() { - return Some(row_ref); - } - self.committed_rows.as_mut().and_then(|i| i.next()) - } -} - -pub struct IndexSeekIterIdWithDeletedMutTx<'a> { - pub(super) inserted_rows: IndexScanRangeIter<'a>, - pub(super) committed_rows: Option>, - pub(super) del_table: &'a DeleteTable, -} - -impl<'a> Iterator for IndexSeekIterIdWithDeletedMutTx<'a> { - type Item = RowRef<'a>; - - fn next(&mut self) -> Option { - if let Some(row_ref) = self.inserted_rows.next() { - return Some(row_ref); - } - - if let Some(row_ref) = self - .committed_rows - .as_mut() - // NOTE for future MVCC implementors: - // In MVCC, it is no longer valid to elide inserts in this way. - // When a transaction inserts a row, that row *must* appear in its insert tables, - // even if the row is already present in the committed state. - // - // Imagine a chain of committed but un-squashed transactions: - // `Committed 0: Insert Row A` - `Committed 1: Delete Row A` - // where `Committed 1` happens after `Committed 0`. - // Imagine a transaction `Running 2: Insert Row A`, - // which began before `Committed 1` was committed. - // Because `Committed 1` has since been committed, - // `Running 2` *must* happen after `Committed 1`. - // Therefore, the correct sequence of events is: - // - Insert Row A - // - Delete Row A - // - Insert Row A - // This is impossible to recover if `Running 2` elides its insert. - // - // As a result, in MVCC, this branch will need to check if the `row_ref` - // also exists in the `tx_state.insert_tables` and ensure it is yielded only once. - .and_then(|i| i.find(|row_ref| !self.del_table.contains(row_ref.pointer()))) - { - // TODO(metrics): This doesn't actually fetch a row. - // Move this counter to `RowRef::read_row`. - // self.num_committed_rows_fetched += 1; - return Some(row_ref); - } - - None - } -} /// An [IterByColRangeTx] for an individual column value. pub type IterByColEqTx<'a, 'r> = IterByColRangeTx<'a, &'r AlgebraicValue>; /// An [IterByColRangeMutTx] for an individual column value. @@ -372,13 +297,8 @@ pub enum IterByColRangeTx<'a, R: RangeBounds> { /// When the column in question does not have an index. Scan(ScanIterByColRangeTx<'a, R>), - /// When the column has an index, and the table - /// has been modified this transaction. - Index(IndexSeekIterIdMutTx<'a>), - - /// When the column has an index, and the table - /// has not been modified in this transaction. - CommittedIndex(IndexScanRangeIter<'a>), + /// When the column has an index. + Index(IndexScanRangeIter<'a>), } impl<'a, R: RangeBounds> Iterator for IterByColRangeTx<'a, R> { @@ -386,9 +306,8 @@ impl<'a, R: RangeBounds> Iterator for IterByColRangeTx<'a, R> { fn next(&mut self) -> Option { match self { - IterByColRangeTx::Scan(range) => range.next(), - IterByColRangeTx::Index(range) => range.next(), - IterByColRangeTx::CommittedIndex(seek) => seek.next(), + IterByColRangeTx::Scan(iter) => iter.next(), + IterByColRangeTx::Index(iter) => iter.next(), } } } @@ -398,21 +317,8 @@ pub enum IterByColRangeMutTx<'a, R: RangeBounds> { /// When the column in question does not have an index. Scan(ScanIterByColRangeMutTx<'a, R>), - /// When the column has an index, and the table - /// has been modified this transaction. - Index(IndexSeekIterIdMutTx<'a>), - - /// When the column has an index, and the table - /// has been modified this transaction, and there are deleted rows. - IndexWithDeletes(IndexSeekIterIdWithDeletedMutTx<'a>), - - /// When the column has an index, and the table - /// has not been modified in this transaction. - CommittedIndex(IndexScanRangeIter<'a>), - - /// When the column has an index, and the table - /// has not been modified in this transaction. - CommittedIndexWithDeletes(CommittedIndexIterWithDeletedMutTx<'a>), + /// When the column has an index. + Index(IndexScanRanged<'a>), } impl<'a, R: RangeBounds> Iterator for IterByColRangeMutTx<'a, R> { @@ -422,9 +328,6 @@ impl<'a, R: RangeBounds> Iterator for IterByColRangeMutTx<'a, R> match self { IterByColRangeMutTx::Scan(range) => range.next(), IterByColRangeMutTx::Index(range) => range.next(), - IterByColRangeMutTx::IndexWithDeletes(range) => range.next(), - IterByColRangeMutTx::CommittedIndex(seek) => seek.next(), - IterByColRangeMutTx::CommittedIndexWithDeletes(seek) => seek.next(), } } } @@ -436,6 +339,7 @@ pub struct ScanIterByColRangeTx<'a, R: RangeBounds> { } impl<'a, R: RangeBounds> ScanIterByColRangeTx<'a, R> { + // TODO(perf, centril): consider taking `cols` by reference. pub(super) fn new(scan_iter: IterTx<'a>, cols: ColList, range: R) -> Self { Self { scan_iter, cols, range } } @@ -463,6 +367,7 @@ pub struct ScanIterByColRangeMutTx<'a, R: RangeBounds> { } impl<'a, R: RangeBounds> ScanIterByColRangeMutTx<'a, R> { + // TODO(perf, centril): consider taking `cols` by reference. pub(super) fn new(scan_iter: IterMutTx<'a>, cols: ColList, range: R) -> Self { Self { scan_iter, cols, range } } diff --git a/crates/core/src/db/datastore/locking_tx_datastore/tx.rs b/crates/core/src/db/datastore/locking_tx_datastore/tx.rs index 71721251a5..891e3e34d8 100644 --- a/crates/core/src/db/datastore/locking_tx_datastore/tx.rs +++ b/crates/core/src/db/datastore/locking_tx_datastore/tx.rs @@ -1,7 +1,6 @@ -use super::datastore::TxMetrics; use super::{ committed_state::CommittedState, - datastore::Result, + datastore::{Result, TxMetrics}, state_view::{IterByColRangeTx, StateView}, IterByColEqTx, SharedReadGuard, }; @@ -69,12 +68,8 @@ impl StateView for TxId { cols: ColList, range: R, ) -> Result> { - match self.committed_state_shared_lock.index_seek(table_id, &cols, &range) { - Some(committed_rows) => Ok(IterByColRangeTx::CommittedIndex(committed_rows)), - None => self - .committed_state_shared_lock - .iter_by_col_range(table_id, cols, range), - } + self.committed_state_shared_lock + .iter_by_col_range(table_id, cols, range) } fn iter_by_col_eq<'a, 'r>( diff --git a/crates/core/src/db/datastore/locking_tx_datastore/tx_state.rs b/crates/core/src/db/datastore/locking_tx_datastore/tx_state.rs index ff302bddca..8e84fccf03 100644 --- a/crates/core/src/db/datastore/locking_tx_datastore/tx_state.rs +++ b/crates/core/src/db/datastore/locking_tx_datastore/tx_state.rs @@ -1,19 +1,23 @@ -use super::delete_table::DeleteTable; +use super::{delete_table::DeleteTable, sequence::Sequence}; use core::ops::RangeBounds; -use spacetimedb_data_structures::map::{IntMap, IntSet}; -use spacetimedb_primitives::{ColList, IndexId, TableId}; +use spacetimedb_data_structures::map::IntMap; +use spacetimedb_lib::db::auth::StAccess; +use spacetimedb_primitives::{ColList, ConstraintId, IndexId, SequenceId, TableId}; use spacetimedb_sats::AlgebraicValue; +use spacetimedb_schema::schema::{ConstraintSchema, IndexSchema, SequenceSchema}; use spacetimedb_table::{ blob_store::{BlobStore, HashMapBlobStore}, indexes::{RowPointer, SquashedOffset}, + pointer_map::PointerMap, static_assert_size, table::{IndexScanRangeIter, RowRef, Table, TableAndIndex}, + table_index::TableIndex, }; use std::collections::{btree_map, BTreeMap}; +use thin_vec::ThinVec; /// A mapping to find the actual index given an `IndexId`. pub(super) type IndexIdMap = IntMap; -pub(super) type RemovedIndexIdSet = IntSet; /// `TxState` tracks all of the modifications made during a particular transaction. /// Rows inserted during a transaction will be added to insert_tables, and similarly, @@ -67,16 +71,47 @@ pub(super) struct TxState { /// - Traverse all rows in the `insert_tables` and free each of their blobs during rollback. pub(super) blob_store: HashMapBlobStore, - /// Provides fast lookup for index id -> an index. - pub(super) index_id_map: IndexIdMap, - - /// Lists all the `IndexId` that are to be removed from `CommittedState::index_id_map`. - // This is in an `Option>` to reduce the size of `TxState` - it's very uncommon - // that this would be created. - pub(super) index_id_map_removals: Option>, + /// All of the immediately applied schema changes to the committed state during this transaction. + /// + /// This is stored as a `ThinVec` as it would be very uncommon to add anything to this list. + pub(super) pending_schema_changes: ThinVec, } -static_assert_size!(TxState, 120); +/// A pending schema change is a change to a `TableSchema` +/// that has been applied immediately to the [`CommittedState`](super::committed_state::CommittedState) +/// and which need to be reverted if the transaction fails. +/// +/// The goal here is that by applying changes immediately, +/// most of the datastore does not have to care about schema change transactionality. +/// The places that do need to care about changes are those that make them, and merge/rollback. +/// Architecting this way should benefit performance both during transactions and merge. +/// On rollback, it should be fairly cheap to e.g., just re-add an index or drop it on the floor. +pub(super) enum PendingSchemaChange { + /// The [`TableIndex`] / [`IndexSchema`] with `IndexId` + /// was removed from the table with [`TableId`]. + IndexRemoved(TableId, IndexId, TableIndex, IndexSchema), + /// The index with [`IndexId`] was added. + /// If adding this index caused the pointer map to be removed, + /// it will be present here. + IndexAdded(TableId, IndexId, Option), + /// The [`Table`] with [`TableId`] was removed. + TableRemoved(TableId, Table), + /// The table with [`TableId`] was added. + TableAdded(TableId), + /// The access of the table with [`TableId`] was changed. + /// The old access was stored. + TableAlterAccess(TableId, StAccess), + /// The constraint with [`ConstraintSchema`] was added to the table with [`TableId`]. + ConstraintRemoved(TableId, ConstraintSchema), + /// The constraint with [`ConstraintId`] was added to the table with [`TableId`]. + ConstraintAdded(TableId, ConstraintId), + /// The [`Sequence`] with [`SequenceSchema`] was added to the table with [`TableId`]. + SequenceRemoved(TableId, Sequence, SequenceSchema), + /// The sequence with [`SequenceId`] was added to the table with [`TableId`]. + SequenceAdded(TableId, SequenceId), +} + +static_assert_size!(TxState, 88); impl TxState { /// Returns the row count in insert tables @@ -107,11 +142,6 @@ impl TxState { .map(|i| i.seek_range(range)) } - /// Returns the table associated with the given `index_id`, if any. - pub(super) fn get_table_for_index(&self, index_id: IndexId) -> Option { - self.index_id_map.get(&index_id).copied() - } - /// Returns the table for `table_id` combined with the index for `index_id`, if both exist. pub(super) fn get_index_by_id_with_table(&self, table_id: TableId, index_id: IndexId) -> Option> { self.insert_tables @@ -173,28 +203,22 @@ impl TxState { Some((table, blob_store)) } - pub(super) fn get_table_and_blob_store_or_maybe_create_from<'this>( - &'this mut self, + pub(super) fn get_table_and_blob_store_or_create_from( + &mut self, table_id: TableId, - template: Option<&Table>, - ) -> Option<( - &'this mut Table, - &'this mut dyn BlobStore, - &'this mut IndexIdMap, - &'this mut DeleteTable, - )> { + template: &Table, + ) -> TxTableForInsertion<'_> { let insert_tables = &mut self.insert_tables; let blob_store = &mut self.blob_store; - let idx_map = &mut self.index_id_map; let table = match insert_tables.entry(table_id) { btree_map::Entry::Vacant(e) => { - let new_table = template?.clone_structure(SquashedOffset::TX_STATE); + let new_table = template.clone_structure(SquashedOffset::TX_STATE); e.insert(new_table) } btree_map::Entry::Occupied(e) => e.into_mut(), }; let delete_table = get_delete_table_mut(&mut self.delete_tables, table_id, table); - Some((table, blob_store, idx_map, delete_table)) + (table, blob_store, delete_table) } /// Assumes that the insert and delete tables exist for `table_id` and fetches them. @@ -202,10 +226,7 @@ impl TxState { /// # Safety /// /// The insert and delete tables must exist. - pub unsafe fn assume_present_get_mut_table( - &mut self, - table_id: TableId, - ) -> (&mut Table, &mut dyn BlobStore, &mut DeleteTable) { + pub unsafe fn assume_present_get_mut_table(&mut self, table_id: TableId) -> TxTableForInsertion<'_> { let tx_blob_store: &mut dyn BlobStore = &mut self.blob_store; let tx_table = self.insert_tables.get_mut(&table_id); // SAFETY: we successfully got a `tx_table` before and haven't removed it since. @@ -217,6 +238,8 @@ impl TxState { } } +pub(super) type TxTableForInsertion<'a> = (&'a mut Table, &'a mut dyn BlobStore, &'a mut DeleteTable); + fn get_delete_table_mut<'a>( delete_tables: &'a mut BTreeMap, table_id: TableId, diff --git a/crates/core/src/vm.rs b/crates/core/src/vm.rs index da0491af63..4c03bf4d51 100644 --- a/crates/core/src/vm.rs +++ b/crates/core/src/vm.rs @@ -353,7 +353,7 @@ static_assert_size!( fn(AlgebraicValue) -> Result, DBError>, IterByColRangeTx<'static, AlgebraicValue>, >, - 232 + 144 ); static_assert_size!( IndexSemiJoinLeft< diff --git a/crates/sats/src/product_value.rs b/crates/sats/src/product_value.rs index bace410234..5db2b7c534 100644 --- a/crates/sats/src/product_value.rs +++ b/crates/sats/src/product_value.rs @@ -87,7 +87,7 @@ impl ProductValue { /// (including zero) fields, otherwise it will consist of a single [AlgebraicValue]. /// /// **Parameters:** - /// - `cols`: A [ColList] containing the indexes of fields to be projected.s + /// - `cols`: A [ColList] containing the indexes of fields to be projected. pub fn project(&self, cols: &ColList) -> Result { if let Some(head) = cols.as_singleton() { self.get_field(head.idx(), None).cloned() diff --git a/crates/schema/src/schema.rs b/crates/schema/src/schema.rs index cf14c675c7..5e7b9618f2 100644 --- a/crates/schema/src/schema.rs +++ b/crates/schema/src/schema.rs @@ -6,6 +6,7 @@ // TODO(1.0): change all the `Box`s in this file to `Identifier`. // This doesn't affect the ABI so can wait until 1.0. +use core::mem; use itertools::Itertools; use spacetimedb_lib::db::auth::{StAccess, StTableType}; use spacetimedb_lib::db::error::{DefType, SchemaError}; @@ -191,11 +192,13 @@ impl TableSchema { &self.columns } - /// Clear all the [Self::indexes], [Self::sequences] & [Self::constraints] - pub fn clear_adjacent_schemas(&mut self) { - self.indexes.clear(); - self.sequences.clear(); - self.constraints.clear(); + /// Extracts all the [Self::indexes], [Self::sequences], and [Self::constraints]. + pub fn take_adjacent_schemas(&mut self) -> (Vec, Vec, Vec) { + ( + mem::take(&mut self.indexes), + mem::take(&mut self.sequences), + mem::take(&mut self.constraints), + ) } // Crud operation on adjacent schemas @@ -210,8 +213,8 @@ impl TableSchema { } /// Removes the given `sequence_id` - pub fn remove_sequence(&mut self, sequence_id: SequenceId) { - self.sequences.retain(|x| x.sequence_id != sequence_id) + pub fn remove_sequence(&mut self, sequence_id: SequenceId) -> Option { + find_remove(&mut self.sequences, |x| x.sequence_id == sequence_id) } /// Add OR replace the [IndexSchema] @@ -224,8 +227,8 @@ impl TableSchema { } /// Removes the given `index_id` - pub fn remove_index(&mut self, index_id: IndexId) { - self.indexes.retain(|x| x.index_id != index_id) + pub fn remove_index(&mut self, index_id: IndexId) -> Option { + find_remove(&mut self.indexes, |x| x.index_id == index_id) } /// Add OR replace the [ConstraintSchema] @@ -242,8 +245,8 @@ impl TableSchema { } /// Removes the given `index_id` - pub fn remove_constraint(&mut self, constraint_id: ConstraintId) { - self.constraints.retain(|x| x.constraint_id != constraint_id) + pub fn remove_constraint(&mut self, constraint_id: ConstraintId) -> Option { + find_remove(&mut self.constraints, |x| x.constraint_id == constraint_id) } /// Concatenate the column names from the `columns` @@ -532,6 +535,12 @@ impl TableSchema { } } +/// Removes and returns the first element satisfying `predicate` in `vec`. +fn find_remove(vec: &mut Vec, predicate: impl Fn(&T) -> bool) -> Option { + let pos = vec.iter().position(predicate)?; + Some(vec.remove(pos)) +} + /// Like `assert_eq!` for `anyhow`, but `$msg` is just a string, not a format string. macro_rules! ensure_eq { ($a:expr, $b:expr, $msg:expr) => { diff --git a/crates/table/src/table.rs b/crates/table/src/table.rs index adae51895d..71de95424a 100644 --- a/crates/table/src/table.rs +++ b/crates/table/src/table.rs @@ -40,7 +40,11 @@ use spacetimedb_sats::{ ser::{Serialize, Serializer}, u256, AlgebraicValue, ProductType, ProductValue, }; -use spacetimedb_schema::{def::IndexAlgorithm, schema::TableSchema, type_for_generate::PrimitiveType}; +use spacetimedb_schema::{ + def::IndexAlgorithm, + schema::{IndexSchema, TableSchema}, + type_for_generate::PrimitiveType, +}; use std::{ collections::{btree_map, BTreeMap}, sync::Arc, @@ -1062,8 +1066,8 @@ impl Table { /// If none but `self` refers to the schema, then the mutation will be in-place. /// Otherwise, the schema must be cloned, mutated, /// and then the cloned version is written back to the table. - pub fn with_mut_schema(&mut self, with: impl FnOnce(&mut TableSchema)) { - with(Arc::make_mut(&mut self.schema)); + pub fn with_mut_schema(&mut self, with: impl FnOnce(&mut TableSchema) -> R) -> R { + with(Arc::make_mut(&mut self.schema)) } /// Returns a new [`TableIndex`] for `table`. @@ -1100,35 +1104,42 @@ impl Table { /// # Safety /// /// Caller must promise that `index` was constructed with the same row type/layout as this table. - pub unsafe fn add_index(&mut self, index_id: IndexId, index: TableIndex) { + pub unsafe fn add_index(&mut self, index_id: IndexId, index: TableIndex) -> Option { let is_unique = index.is_unique(); self.indexes.insert(index_id, index); // Remove the pointer map, if any. if is_unique { - self.pointer_map = None; + self.pointer_map.take() + } else { + None } } /// Removes an index from the table. /// /// Returns whether an index existed with `index_id`. - pub fn delete_index(&mut self, blob_store: &dyn BlobStore, index_id: IndexId) -> bool { - let index = self.indexes.remove(&index_id); - let ret = index.is_some(); + pub fn delete_index( + &mut self, + blob_store: &dyn BlobStore, + index_id: IndexId, + pointer_map: Option, + ) -> Option<(TableIndex, IndexSchema)> { + let index = self.indexes.remove(&index_id)?; // If we removed the last unique index, add a pointer map. - if index.is_some_and(|i| i.is_unique()) && !self.indexes.values().any(|idx| idx.is_unique()) { - self.rebuild_pointer_map(blob_store); + if index.is_unique() && !self.indexes.values().any(|idx| idx.is_unique()) { + self.pointer_map = Some(pointer_map.unwrap_or_else(|| self.rebuild_pointer_map(blob_store))); } // Remove index from schema. // // This likely will do a clone-write as over time? // The schema might have found other referents. - self.with_mut_schema(|s| s.indexes.retain(|x| x.index_id != index_id)); - - ret + let schema = self + .with_mut_schema(|s| s.remove_index(index_id)) + .expect("there should be an index with `index_id`"); + Some((index, schema)) } /// Returns an iterator over all the rows of `self`, yielded as [`RefRef`]s. @@ -1225,7 +1236,7 @@ impl Table { // Recompute table metadata based on the new pages. // Compute the row count first, in case later computations want to use it as a capacity to pre-allocate. self.compute_row_count(blob_store); - self.rebuild_pointer_map(blob_store); + self.pointer_map = Some(self.rebuild_pointer_map(blob_store)); } /// Consumes the table, returning some constituents needed for merge. @@ -1735,16 +1746,10 @@ impl<'a> Iterator for IndexScanPointIter<'a> { type Item = RowRef<'a>; fn next(&mut self) -> Option { - let ptr = self.btree_index_iter.next()?; - // FIXME: Determine if this is correct and if so use `_unchecked`. - // Will a table's index necessarily hold only pointers into that index? - // Edge case: if an index is added during a transaction which then scans that index, - // it appears that the newly-created `TxState` index - // will also hold pointers into the `CommittedState`. - // - // SAFETY: Assuming this is correct, - // `ptr` came from the index, which always holds pointers to valid rows. - self.table.get_row_ref(self.blob_store, ptr) + self.btree_index_iter.next().map(|ptr| { + // SAFETY: `ptr` came from the index, which always holds pointers to valid rows for its table. + unsafe { self.table.get_row_ref_unchecked(self.blob_store, ptr) } + }) } } @@ -1765,16 +1770,10 @@ impl<'a> Iterator for IndexScanRangeIter<'a> { type Item = RowRef<'a>; fn next(&mut self) -> Option { - let ptr = self.btree_index_iter.next()?; - // FIXME: Determine if this is correct and if so use `_unchecked`. - // Will a table's index necessarily hold only pointers into that index? - // Edge case: if an index is added during a transaction which then scans that index, - // it appears that the newly-created `TxState` index - // will also hold pointers into the `CommittedState`. - // - // SAFETY: Assuming this is correct, - // `ptr` came from the index, which always holds pointers to valid rows. - self.table.get_row_ref(self.blob_store, ptr) + self.btree_index_iter.next().map(|ptr| { + // SAFETY: `ptr` came from the index, which always holds pointers to valid rows for its table. + unsafe { self.table.get_row_ref_unchecked(self.blob_store, ptr) } + }) } } @@ -1930,14 +1929,12 @@ impl Table { /// Called when restoring from a snapshot after installing the pages, /// but after computing the row count, /// since snapshots do not save the pointer map.. - fn rebuild_pointer_map(&mut self, blob_store: &dyn BlobStore) { + fn rebuild_pointer_map(&mut self, blob_store: &dyn BlobStore) -> PointerMap { // TODO(perf): Pre-allocate `PointerMap.map` with capacity `self.row_count`. // Alternatively, do this at the same time as `compute_row_count`. - let ptrs = self - .scan_rows(blob_store) + self.scan_rows(blob_store) .map(|row_ref| (row_ref.row_hash(), row_ref.pointer())) - .collect::(); - self.pointer_map = Some(ptrs); + .collect() } /// Compute and store `self.row_count` and `self.blob_store_bytes` diff --git a/crates/table/src/table_index/mod.rs b/crates/table/src/table_index/mod.rs index 6d08223e7d..394c0fb07a 100644 --- a/crates/table/src/table_index/mod.rs +++ b/crates/table/src/table_index/mod.rs @@ -1181,7 +1181,7 @@ impl TableIndex { } } - /// Extends [`TableIndex`] with `rows`.s + /// Extends [`TableIndex`] with `rows`. /// /// Returns the first unique constraint violation caused when adding this index, if any. /// @@ -1201,6 +1201,55 @@ impl TableIndex { .try_for_each(|row_ref| unsafe { self.check_and_insert(row_ref) }) } + /// Returns an error with the first unique constraint violation that + /// would occur if `self` and `other` were to be merged. + pub fn can_merge(&self, other: &Self) -> Result<(), RowPointer> { + use TypedIndex::*; + match (&self.idx, &other.idx) { + // For non-unique indices, it's always possible to merge. + (BtreeBool(_), BtreeBool(_)) + | (BtreeU8(_), BtreeU8(_)) + | (BtreeSumTag(_), BtreeSumTag(_)) + | (BtreeI8(_), BtreeI8(_)) + | (BtreeU16(_), BtreeU16(_)) + | (BtreeI16(_), BtreeI16(_)) + | (BtreeU32(_), BtreeU32(_)) + | (BtreeI32(_), BtreeI32(_)) + | (BtreeU64(_), BtreeU64(_)) + | (BtreeI64(_), BtreeI64(_)) + | (BtreeU128(_), BtreeU128(_)) + | (BtreeI128(_), BtreeI128(_)) + | (BtreeU256(_), BtreeU256(_)) + | (BtreeI256(_), BtreeI256(_)) + | (BtreeString(_), BtreeString(_)) + | (BtreeAV(_), BtreeAV(_)) => Ok(()), + // For unique indices, we'll need to see if everything in `other` can be added to `idx`. + (UniqueBtreeBool(idx), UniqueBtreeBool(other)) => idx.can_merge(other).map_err(|ptr| *ptr), + (UniqueBtreeU8(idx), UniqueBtreeU8(other)) => idx.can_merge(other).map_err(|ptr| *ptr), + (UniqueBtreeSumTag(idx), UniqueBtreeSumTag(other)) => idx.can_merge(other).map_err(|ptr| *ptr), + (UniqueBtreeI8(idx), UniqueBtreeI8(other)) => idx.can_merge(other).map_err(|ptr| *ptr), + (UniqueBtreeU16(idx), UniqueBtreeU16(other)) => idx.can_merge(other).map_err(|ptr| *ptr), + (UniqueBtreeI16(idx), UniqueBtreeI16(other)) => idx.can_merge(other).map_err(|ptr| *ptr), + (UniqueBtreeU32(idx), UniqueBtreeU32(other)) => idx.can_merge(other).map_err(|ptr| *ptr), + (UniqueBtreeI32(idx), UniqueBtreeI32(other)) => idx.can_merge(other).map_err(|ptr| *ptr), + (UniqueBtreeU64(idx), UniqueBtreeU64(other)) => idx.can_merge(other).map_err(|ptr| *ptr), + (UniqueBtreeI64(idx), UniqueBtreeI64(other)) => idx.can_merge(other).map_err(|ptr| *ptr), + (UniqueBtreeU128(idx), UniqueBtreeU128(other)) => idx.can_merge(other).map_err(|ptr| *ptr), + (UniqueBtreeI128(idx), UniqueBtreeI128(other)) => idx.can_merge(other).map_err(|ptr| *ptr), + (UniqueBtreeU256(idx), UniqueBtreeU256(other)) => idx.can_merge(other).map_err(|ptr| *ptr), + (UniqueBtreeI256(idx), UniqueBtreeI256(other)) => idx.can_merge(other).map_err(|ptr| *ptr), + (UniqueBtreeString(idx), UniqueBtreeString(other)) => idx.can_merge(other).map_err(|ptr| *ptr), + (UniqueBtreeAV(idx), UniqueBtreeAV(other)) => idx.can_merge(other).map_err(|ptr| *ptr), + (UniqueDirectU8(idx), UniqueDirectU8(other)) => idx.can_merge(other), + (UniqueDirectSumTag(idx), UniqueDirectSumTag(other)) => idx.can_merge(other), + (UniqueDirectU16(idx), UniqueDirectU16(other)) => idx.can_merge(other), + (UniqueDirectU32(idx), UniqueDirectU32(other)) => idx.can_merge(other), + (UniqueDirectU64(idx), UniqueDirectU64(other)) => idx.can_merge(other), + + _ => unreachable!("non-matching index kinds"), + } + } + /// Deletes all entries from the index, leaving it empty. /// /// When inserting a newly-created index into the committed state, diff --git a/crates/table/src/table_index/unique_direct_fixed_cap_index.rs b/crates/table/src/table_index/unique_direct_fixed_cap_index.rs index e00dac08da..dbc71b672e 100644 --- a/crates/table/src/table_index/unique_direct_fixed_cap_index.rs +++ b/crates/table/src/table_index/unique_direct_fixed_cap_index.rs @@ -120,6 +120,18 @@ impl UniqueDirectFixedCapIndex { self.array.fill(NONE_PTR); self.len = 0; } + + /// Returns whether `other` can be merged into `self` + /// with an error containing the element in `self` that caused the violation. + pub(crate) fn can_merge(&self, other: &UniqueDirectFixedCapIndex) -> Result<(), RowPointer> { + for (slot_s, slot_o) in self.array.iter().zip(other.array.iter()) { + if *slot_s != NONE_PTR && *slot_o != NONE_PTR { + // For the same key, we found both slots occupied, so we cannot merge. + return Err(slot_s.with_reserved_bit(false)); + } + } + Ok(()) + } } /// An iterator over a range of keys in a [`UniqueDirectFixedCapIndex`]. diff --git a/crates/table/src/table_index/unique_direct_index.rs b/crates/table/src/table_index/unique_direct_index.rs index 0f30df2317..865ec0ff88 100644 --- a/crates/table/src/table_index/unique_direct_index.rs +++ b/crates/table/src/table_index/unique_direct_index.rs @@ -204,6 +204,25 @@ impl UniqueDirectIndex { self.outer.clear(); self.len = 0; } + + /// Returns whether `other` can be merged into `self` + /// with an error containing the element in `self` that caused the violation. + pub(crate) fn can_merge(&self, other: &UniqueDirectIndex) -> Result<(), RowPointer> { + for (inner_s, inner_o) in self.outer.iter().zip(&other.outer) { + let (Some(inner_s), Some(inner_o)) = (inner_s, inner_o) else { + continue; + }; + + for (slot_s, slot_o) in inner_s.inner.iter().zip(inner_o.inner.iter()) { + if *slot_s != NONE_PTR && *slot_o != NONE_PTR { + // For the same key, we found both slots occupied, so we cannot merge. + return Err(slot_s.with_reserved_bit(false)); + } + } + } + + Ok(()) + } } /// An iterator over the potential value in a [`UniqueDirectMap`] for a given key. diff --git a/crates/table/src/table_index/uniquemap.rs b/crates/table/src/table_index/uniquemap.rs index edee31df32..b76dea08c5 100644 --- a/crates/table/src/table_index/uniquemap.rs +++ b/crates/table/src/table_index/uniquemap.rs @@ -81,6 +81,15 @@ impl UniqueMap { pub fn clear(&mut self) { self.map.clear(); } + + /// Returns whether `other` can be merged into `self` + /// with an error containing the element in `self` that caused the violation. + pub(crate) fn can_merge(&self, other: &UniqueMap) -> Result<(), &V> { + let Some(found) = other.map.keys().find_map(|key| self.map.get(key)) else { + return Ok(()); + }; + Err(found) + } } /// An iterator over the potential value in a [`UniqueMap`] for a given key.