diff --git a/crates/core/src/db/relational_db.rs b/crates/core/src/db/relational_db.rs index 72deed1381..bd628b93ee 100644 --- a/crates/core/src/db/relational_db.rs +++ b/crates/core/src/db/relational_db.rs @@ -353,6 +353,12 @@ impl RelationalDB { self.database_identity, schema.table_name ); + // FIXME: `create_table` of tables with sequences + // gives a different initial allocation than is given by `CommittedState::bootstrap_system_tables`. + // See comment in that method. + // This results in requiring `CommittedState::fixup_delete_duplicate_system_sequence_rows`. + // Fix `migrate_system_tables` to create new system sequences + // with the same initial allocation as `bootstrap_system_tables`. let _ = self.create_table(&mut tx, schema.clone())?; } } diff --git a/crates/datastore/src/locking_tx_datastore/committed_state.rs b/crates/datastore/src/locking_tx_datastore/committed_state.rs index ba865f02b5..5ee9049cd1 100644 --- a/crates/datastore/src/locking_tx_datastore/committed_state.rs +++ b/crates/datastore/src/locking_tx_datastore/committed_state.rs @@ -16,12 +16,13 @@ use crate::{ IterByColRangeTx, }, system_tables::{ - is_built_in_meta_row, system_tables, StColumnRow, StConstraintData, StConstraintRow, StIndexRow, StSequenceRow, - StTableFields, StTableRow, StViewRow, SystemTable, ST_CLIENT_ID, ST_CLIENT_IDX, ST_COLUMN_ID, ST_COLUMN_IDX, - ST_COLUMN_NAME, ST_CONSTRAINT_ID, ST_CONSTRAINT_IDX, ST_CONSTRAINT_NAME, ST_INDEX_ID, ST_INDEX_IDX, - ST_INDEX_NAME, ST_MODULE_ID, ST_MODULE_IDX, ST_ROW_LEVEL_SECURITY_ID, ST_ROW_LEVEL_SECURITY_IDX, - ST_SCHEDULED_ID, ST_SCHEDULED_IDX, ST_SEQUENCE_ID, ST_SEQUENCE_IDX, ST_SEQUENCE_NAME, ST_TABLE_ID, - ST_TABLE_IDX, ST_VAR_ID, ST_VAR_IDX, ST_VIEW_ARG_ID, ST_VIEW_ARG_IDX, + is_built_in_meta_row, system_tables, table_id_is_reserved, StColumnRow, StConstraintData, StConstraintRow, + StIndexRow, StSequenceFields, StSequenceRow, StTableFields, StTableRow, StViewRow, SystemTable, ST_CLIENT_ID, + ST_CLIENT_IDX, ST_COLUMN_ID, ST_COLUMN_IDX, ST_COLUMN_NAME, ST_CONSTRAINT_ID, ST_CONSTRAINT_IDX, + ST_CONSTRAINT_NAME, ST_INDEX_ID, ST_INDEX_IDX, ST_INDEX_NAME, ST_MODULE_ID, ST_MODULE_IDX, + ST_ROW_LEVEL_SECURITY_ID, ST_ROW_LEVEL_SECURITY_IDX, ST_SCHEDULED_ID, ST_SCHEDULED_IDX, ST_SEQUENCE_ID, + ST_SEQUENCE_IDX, ST_SEQUENCE_NAME, ST_TABLE_ID, ST_TABLE_IDX, ST_VAR_ID, ST_VAR_IDX, ST_VIEW_ARG_ID, + ST_VIEW_ARG_IDX, }, traits::{EphemeralTables, TxData}, }; @@ -34,10 +35,10 @@ use crate::{ }; use anyhow::anyhow; use core::{convert::Infallible, ops::RangeBounds}; -use spacetimedb_data_structures::map::{HashSet, IntMap, IntSet}; +use spacetimedb_data_structures::map::{HashMap, HashSet, IntMap, IntSet}; use spacetimedb_durability::TxOffset; use spacetimedb_lib::{db::auth::StTableType, Identity}; -use spacetimedb_primitives::{ColId, ColList, ColSet, IndexId, TableId, ViewId}; +use spacetimedb_primitives::{ColId, ColList, ColSet, IndexId, SequenceId, TableId, ViewId}; use spacetimedb_sats::{algebraic_value::de::ValueDeserializer, memory_usage::MemoryUsage, Deserialize}; use spacetimedb_sats::{AlgebraicValue, ProductValue}; use spacetimedb_schema::{ @@ -201,6 +202,93 @@ impl CommittedState { } } + /// Delete all but the highest-allocation `st_sequence` row for each system sequence. + /// + /// Prior versions of `RelationalDb::migrate_system_tables` (defined in the `core` crate) + /// initialized newly-created system sequences to `allocation: 4097`, + /// while `committed_state::bootstrap_system_tables` sets `allocation: 4096`. + /// This affected the system table migration which added + /// `st_view_view_id_seq` and `st_view_arg_id_seq`. + /// As a result, when replaying these databases' commitlogs without a snapshot, + /// we will end up with two rows in `st_sequence` for each of these sequences, + /// resulting in a unique constraint violation in `CommittedState::build_indexes`. + /// We call this method in [`super::datastore::Locking::rebuild_state_after_replay`] + /// to avoid that unique constraint violation. + pub(super) fn fixup_delete_duplicate_system_sequence_rows(&mut self) { + struct StSequenceRowInfo { + sequence_id: SequenceId, + allocated: i128, + row_pointer: RowPointer, + } + + // Get all the `st_sequence` rows which refer to sequences on system tables, + // including any duplicates caused by the bug described above. + let sequence_rows = self + .table_scan(ST_SEQUENCE_ID) + .expect("`st_sequence` should exist") + .filter_map(|row_ref| { + // Read the table ID to which the sequence refers, + // in order to determine if this is a system sequence or not. + let table_id = row_ref + .read_col::(StSequenceFields::TableId) + .expect("`st_sequence` row should conform to `st_sequence` schema"); + + // If this sequence refers to a system table, it may need a fixup. + // User tables' sequences will never need fixups. + table_id_is_reserved(table_id).then(|| { + let allocated = row_ref + .read_col::(StSequenceFields::Allocated) + .expect("`st_sequence` row should conform to `st_sequence` schema"); + let sequence_id = row_ref + .read_col::(StSequenceFields::SequenceId) + .expect("`st_sequence` row should conform to `st_sequence` schema"); + StSequenceRowInfo { + allocated, + sequence_id, + row_pointer: row_ref.pointer(), + } + }) + }) + .collect::>(); + + let (st_sequence, blob_store, ..) = self + .get_table_and_blob_store_mut(ST_SEQUENCE_ID) + .expect("`st_sequence` should exist"); + + // Track the row with the highest allocation for each sequence. + let mut highest_allocations: HashMap = HashMap::default(); + + for StSequenceRowInfo { + sequence_id, + allocated, + row_pointer, + } in sequence_rows + { + // For each `st_sequence` row which refers to a system table, + // if we've already seen a row for the same sequence, + // keep only the row with the higher allocation. + if let Some((prev_allocated, prev_row_pointer)) = + highest_allocations.insert(sequence_id, (allocated, row_pointer)) + { + // We have a duplicate row. We want to keep whichever has the higher `allocated`, + // and delete the other. + let row_pointer_to_delete = if prev_allocated > allocated { + // The previous row has a higher allocation than the new row, + // so delete the new row and restore `previous` to `highest_allocations`. + highest_allocations.insert(sequence_id, (prev_allocated, prev_row_pointer)); + row_pointer + } else { + // The previous row does not have a higher allocation than the new, + // so delete the previous row and keep the new one. + prev_row_pointer + }; + + st_sequence.delete(blob_store, row_pointer_to_delete, |_| ()) + .expect("Duplicated `st_sequence` row at `row_pointer_to_delete` should be present in `st_sequence` during fixup"); + } + } + } + /// Extremely delicate function to bootstrap the system tables. /// Don't update this unless you know what you're doing. pub(super) fn bootstrap_system_tables(&mut self, database_identity: Identity) -> Result<()> { @@ -442,11 +530,12 @@ impl CommittedState { Err(InsertError::IndexError(e)) => return Err(IndexError::UniqueConstraintViolation(e).into()), }; + let row_ptr = row_ref.pointer(); + if table_id == ST_COLUMN_ID { // We've made a modification to `st_column`. // The type of a table has changed, so figure out which. // The first column in `StColumnRow` is `table_id`. - let row_ptr = row_ref.pointer(); self.st_column_changed(row, row_ptr)?; } diff --git a/crates/datastore/src/locking_tx_datastore/datastore.rs b/crates/datastore/src/locking_tx_datastore/datastore.rs index e5d65df988..1fcebe9696 100644 --- a/crates/datastore/src/locking_tx_datastore/datastore.rs +++ b/crates/datastore/src/locking_tx_datastore/datastore.rs @@ -141,6 +141,18 @@ impl Locking { /// There may eventually be better way to do this, but this will have to do for now. pub fn rebuild_state_after_replay(&self) -> Result<()> { let mut committed_state = self.committed_state.write_arc(); + + // Prior versions of `RelationalDb::migrate_system_tables` (defined in the `core` crate) + // initialized newly-created system sequences to `allocation: 4097`, + // while `committed_state::bootstrap_system_tables` sets `allocation: 4096`. + // This affected the system table migration which added + // `st_view_view_id_seq` and `st_view_arg_id_seq`. + // As a result, when replaying these databases' commitlogs without a snapshot, + // we will end up with two rows in `st_sequence` for each of these sequences, + // resulting in a unique constraint violation in `CommittedState::build_indexes`. + // We fix this by, for each system sequence, deleting all but the row with the highest allocation. + committed_state.fixup_delete_duplicate_system_sequence_rows(); + // `build_missing_tables` must be called before indexes. // Honestly this should maybe just be one big procedure. // See John Carmack's philosophy on this.