From 4f2c064fe597e959bdd0f94a2118ff6687803149 Mon Sep 17 00:00:00 2001 From: Shubham Mishra Date: Mon, 27 Apr 2026 18:17:16 +0530 Subject: [PATCH] fix: update seq table when migrating table. (#4902) # Description of Changes `MutTxId::add_columns_to_table` creates new table but only copies sequences to in-memory state, which causes `autoinc` columns to reset on module restart. The existing implementation relies on `create_table_and_update_seq` helper, which only updates the sequence state in memory. This change ensures that the `allocation` is also persisted to the system table, keeping it consistent across restarts. # API and ABI breaking changes NA # Expected complexity level and risk 2 # Testing Added a test, which migrate table and checks for `autoinc` column value without and with restart. --------- Signed-off-by: Shubham Mishra Co-authored-by: Mazdak Farrokhzad --- crates/core/src/db/update.rs | 120 +++++++++++++++++- .../src/locking_tx_datastore/mut_tx.rs | 79 +++++++++--- .../src/locking_tx_datastore/sequence.rs | 11 ++ 3 files changed, 187 insertions(+), 23 deletions(-) diff --git a/crates/core/src/db/update.rs b/crates/core/src/db/update.rs index 6e7db31f7..6c7c3bd9f 100644 --- a/crates/core/src/db/update.rs +++ b/crates/core/src/db/update.rs @@ -340,7 +340,7 @@ mod test { host::module_host::create_table_from_def, }; use spacetimedb_datastore::locking_tx_datastore::PendingSchemaChange; - use spacetimedb_lib::db::raw_def::v9::{btree, RawModuleDefV9Builder, TableAccess}; + use spacetimedb_lib::db::raw_def::v9::{btree, RawIndexAlgorithm, RawModuleDefV9Builder, TableAccess}; use spacetimedb_sats::{product, AlgebraicType, AlgebraicType::U64}; use spacetimedb_schema::{auto_migrate::ponder_migrate, def::ModuleDef}; @@ -432,7 +432,7 @@ mod test { let stdb = TestDB::durable()?; // Step 1: Table with a primary key (requires unique constraint + index). - let module_v1 = { + let module_v1: ModuleDef = { let mut builder = RawModuleDefV9Builder::new(); builder .build_table_with_new_type("person", [("name", AlgebraicType::String)], true) @@ -446,7 +446,7 @@ mod test { }; // Step 2: Same table, but primary key removed. - let module_v2 = { + let module_v2: ModuleDef = { let mut builder = RawModuleDefV9Builder::new(); builder .build_table_with_new_type("person", [("name", AlgebraicType::String)], true) @@ -580,4 +580,118 @@ mod test { ); Ok(()) } + + /// Verifies that `autoinc` sequence survives a schema migration that adds a column, + /// and is also correctly persisted across database replay. + /// + /// Flow: + /// - Create v1 schema and consume a few sequence values. + /// - Migrate to v2 (adds a column with a default). + /// - Ensure next insert continues the sequence (no reset). + /// - Reopen DB and verify allocation cursor is still preserved. + #[test] + fn auto_inc_sequence_survives_add_column_migration() -> anyhow::Result<()> { + let auth_ctx = AuthCtx::for_testing(); + let stdb = TestDB::durable()?; + + // Define the old module that was before. + let module_v1: ModuleDef = { + let mut b = RawModuleDefV9Builder::new(); + b.build_table_with_new_type("seq_t", [("id", AlgebraicType::I64)], true) + .with_auto_inc_primary_key(0) + .with_index_no_accessor_name(RawIndexAlgorithm::BTree { columns: 0.into() }) + .with_access(TableAccess::Public) + .finish(); + b.finish().try_into().expect("valid module v1") + }; + + // Define the module that we're migrating to. + let module_v2: ModuleDef = { + let mut b = RawModuleDefV9Builder::new(); + b.build_table_with_new_type( + "seq_t", + [("id", AlgebraicType::I64), ("payload", AlgebraicType::U64)], + true, + ) + .with_auto_inc_primary_key(0) + .with_index_no_accessor_name(btree(0)) + .with_access(TableAccess::Public) + .with_default_column_value(1, product![0u64].into()) + .finish(); + b.finish().try_into().expect("valid module v2") + }; + + // helper to insert + collect sorted ids + let insert_and_collect_ids = |stdb: &TestDB, payload: AlgebraicValue| -> anyhow::Result> { + let mut tx = begin_mut_tx(stdb); + let table_id = stdb.table_id_from_name_mut(&tx, "seq_t")?.expect("seq_t should exist"); + + insert(stdb, &mut tx, table_id, &payload)?; + + let mut ids = stdb + .iter_mut(&tx, table_id)? + .map(|r| r.read_col::(0)) + .collect::, _>>()?; + + ids.sort(); + stdb.commit_tx(tx)?; + Ok(ids) + }; + + // Create the old tables and insert two rows + // that use the auto-inc sequence. + { + let mut tx = begin_mut_tx(&stdb); + + for def in module_v1.tables() { + create_table_from_def(&stdb, &mut tx, &module_v1, def)?; + } + + let table_id = stdb.table_id_from_name_mut(&tx, "seq_t")?.expect("seq_t should exist"); + + insert(&stdb, &mut tx, table_id, &product![0i64])?; + insert(&stdb, &mut tx, table_id, &product![0i64])?; + + stdb.commit_tx(tx)?; + } + + // Successfully update the database to the new module. + { + let mut tx = begin_mut_tx(&stdb); + + let plan = ponder_migrate(&module_v1, &module_v2)?; + let res = update_database(&stdb, &mut tx, auth_ctx, plan, &TestLogger)?; + + assert!(matches!( + res, + UpdateResult::Success | UpdateResult::RequiresClientDisconnect + )); + + stdb.commit_tx(tx)?; + } + + // Check that the new table has reused the sequence + // from the old table such that the last row has the value 3. + { + let ids = insert_and_collect_ids(&stdb, product![0i64, 99u64].into())?; + assert!( + ids.iter().last().unwrap() == &3, + "expected id 3 after migration, got {ids:?}" + ); + } + + // Check that we can replay. + let stdb = stdb.reopen()?; + + // After replay, the allocation cursor should be preserved. + { + let ids = insert_and_collect_ids(&stdb, product![0i64, 99u64].into())?; + assert!( + ids.iter().last().unwrap() == &4097, + "expected id 4097 after reopen, got {ids:?}" + ); + } + + Ok(()) + } } diff --git a/crates/datastore/src/locking_tx_datastore/mut_tx.rs b/crates/datastore/src/locking_tx_datastore/mut_tx.rs index d73f794e0..9008bc6da 100644 --- a/crates/datastore/src/locking_tx_datastore/mut_tx.rs +++ b/crates/datastore/src/locking_tx_datastore/mut_tx.rs @@ -1015,6 +1015,27 @@ impl MutTxId { Ok(ret) } + fn update_st_sequence_row( + &mut self, + sequence_id: SequenceId, + updater: impl FnOnce(&mut StSequenceRow) -> R, + ) -> Result { + // Fetch the row. + let st_sequence_ref = self + .iter_by_col_eq(ST_SEQUENCE_ID, StSequenceFields::SequenceId, &sequence_id.into())? + .last() + .ok_or(SequenceError::NotFound(sequence_id))?; + let ptr = st_sequence_ref.pointer(); + let mut row = StSequenceRow::try_from(st_sequence_ref)?; + + // Delete the row, run updates, and insert again. + self.delete(ST_SEQUENCE_ID, ptr)?; + let ret = updater(&mut row); + self.insert_via_serialize_bsatn(ST_SEQUENCE_ID, &row)?; + + Ok(ret) + } + pub fn view_id_from_name(&self, view_name: &str) -> Result> { let view_name = &view_name.into(); let row = self @@ -1207,19 +1228,20 @@ impl MutTxId { // Store sequence values to restore them later with new table. // Using a map from name to value as the new sequence ids will be different. // and I am not sure if we should rely on the order of sequences in the table schema. - let seq_values: HashMap<_, i128> = original_table_schema - .sequences - .iter() - .map(|s| { - ( - s.sequence_name.clone(), - self.sequence_state_lock - .get_sequence_mut(s.sequence_id) - .expect("sequence exists in original schema and should in sequence state.") - .get_value(), - ) - }) - .collect(); + let mut seq_values: HashMap<_, (i128, i128)> = HashMap::default(); + for seq in &original_table_schema.sequences { + let value = self + .sequence_state_lock + .get_sequence_mut(seq.sequence_id) + .expect("sequence exists in original schema and should in sequence state.") + .get_value(); + let allocated = self + .iter_by_col_eq(ST_SEQUENCE_ID, StSequenceFields::SequenceId, &seq.sequence_id.into())? + .last() + .ok_or(SequenceError::NotFound(seq.sequence_id))? + .read_col(StSequenceFields::Allocated)?; + seq_values.insert(seq.sequence_name.clone(), (value, allocated)); + } // Drop existing table first due to unique constraints on table name in `st_table` self.drop_table(table_id)?; @@ -1249,23 +1271,40 @@ impl MutTxId { Ok(new_table_id) } + /// Recreate a table and restore sequence runtime state after a destructive + /// schema change (for example `add_columns_to_table`). + /// + /// `create_table(...)` generates fresh table/sequence IDs and inserts fresh + /// rows into `st_sequence`. We then restore preserved `(value, allocated)` + /// by sequence name: + /// - update in-memory sequence state (`SequencesState`) so this process keeps + /// allocating from the same point; + /// - patch the newly created `st_sequence` row so reopen/replay restores the + /// same allocation cursor instead of sequence start. fn create_table_and_update_seq( &mut self, table_schema: TableSchema, - seq_values: HashMap, + seq_values: HashMap, ) -> Result { let table_id = self.create_table(table_schema)?; let table_schema = self.schema_for_table(table_id)?; for seq in table_schema.sequences.iter() { - let new_seq = self - .sequence_state_lock - .get_sequence_mut(seq.sequence_id) - .expect("sequence just created"); - let value = *seq_values + let (value, allocated) = *seq_values .get(&seq.sequence_name) .ok_or_else(|| SequenceError::NotFound(seq.sequence_id))?; - new_seq.update_value(value); + { + let new_seq = self + .sequence_state_lock + .get_sequence_mut(seq.sequence_id) + .expect("sequence just created"); + new_seq.update_value(value); + new_seq.update_allocation(allocated); + } + + // This updates the new `st_sequence` row created by `create_table(...)` + // above (old table rows are already dropped). + self.update_st_sequence_row(seq.sequence_id, |st| st.allocated = allocated)?; } Ok(table_id) diff --git a/crates/datastore/src/locking_tx_datastore/sequence.rs b/crates/datastore/src/locking_tx_datastore/sequence.rs index 607eacb95..1d29dda29 100644 --- a/crates/datastore/src/locking_tx_datastore/sequence.rs +++ b/crates/datastore/src/locking_tx_datastore/sequence.rs @@ -82,6 +82,17 @@ impl Sequence { self.value = new_value; } + /// Update the persisted allocation cursor for the sequence. + pub(super) fn update_allocation(&mut self, new_allocated: i128) { + if !(self.schema.min_value..=self.schema.max_value).contains(&new_allocated) { + panic!( + "Invalid sequence allocation update: new allocated {} is out of bounds for sequence with min_value {} and max_value {}", + new_allocated, self.schema.min_value, self.schema.max_value + ); + } + self.allocated = new_allocated; + } + pub(super) fn get_value(&self) -> i128 { self.value }