mirror of
https://github.com/clockworklabs/SpacetimeDB.git
synced 2026-05-06 07:26:43 -04:00
fix: update seq table when migrating table. (#4902)
# Description of Changes `MutTxId::add_columns_to_table` creates new table but only copies sequences to in-memory state, which causes `autoinc` columns to reset on module restart. The existing implementation relies on `create_table_and_update_seq` helper, which only updates the sequence state in memory. This change ensures that the `allocation` is also persisted to the system table, keeping it consistent across restarts. # API and ABI breaking changes NA # Expected complexity level and risk 2 # Testing Added a test, which migrate table and checks for `autoinc` column value without and with restart. --------- Signed-off-by: Shubham Mishra <shivam828787@gmail.com> Co-authored-by: Mazdak Farrokhzad <twingoow@gmail.com>
This commit is contained in:
@@ -340,7 +340,7 @@ mod test {
|
||||
host::module_host::create_table_from_def,
|
||||
};
|
||||
use spacetimedb_datastore::locking_tx_datastore::PendingSchemaChange;
|
||||
use spacetimedb_lib::db::raw_def::v9::{btree, RawModuleDefV9Builder, TableAccess};
|
||||
use spacetimedb_lib::db::raw_def::v9::{btree, RawIndexAlgorithm, RawModuleDefV9Builder, TableAccess};
|
||||
use spacetimedb_sats::{product, AlgebraicType, AlgebraicType::U64};
|
||||
use spacetimedb_schema::{auto_migrate::ponder_migrate, def::ModuleDef};
|
||||
|
||||
@@ -432,7 +432,7 @@ mod test {
|
||||
let stdb = TestDB::durable()?;
|
||||
|
||||
// Step 1: Table with a primary key (requires unique constraint + index).
|
||||
let module_v1 = {
|
||||
let module_v1: ModuleDef = {
|
||||
let mut builder = RawModuleDefV9Builder::new();
|
||||
builder
|
||||
.build_table_with_new_type("person", [("name", AlgebraicType::String)], true)
|
||||
@@ -446,7 +446,7 @@ mod test {
|
||||
};
|
||||
|
||||
// Step 2: Same table, but primary key removed.
|
||||
let module_v2 = {
|
||||
let module_v2: ModuleDef = {
|
||||
let mut builder = RawModuleDefV9Builder::new();
|
||||
builder
|
||||
.build_table_with_new_type("person", [("name", AlgebraicType::String)], true)
|
||||
@@ -580,4 +580,118 @@ mod test {
|
||||
);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Verifies that `autoinc` sequence survives a schema migration that adds a column,
|
||||
/// and is also correctly persisted across database replay.
|
||||
///
|
||||
/// Flow:
|
||||
/// - Create v1 schema and consume a few sequence values.
|
||||
/// - Migrate to v2 (adds a column with a default).
|
||||
/// - Ensure next insert continues the sequence (no reset).
|
||||
/// - Reopen DB and verify allocation cursor is still preserved.
|
||||
#[test]
|
||||
fn auto_inc_sequence_survives_add_column_migration() -> anyhow::Result<()> {
|
||||
let auth_ctx = AuthCtx::for_testing();
|
||||
let stdb = TestDB::durable()?;
|
||||
|
||||
// Define the old module that was before.
|
||||
let module_v1: ModuleDef = {
|
||||
let mut b = RawModuleDefV9Builder::new();
|
||||
b.build_table_with_new_type("seq_t", [("id", AlgebraicType::I64)], true)
|
||||
.with_auto_inc_primary_key(0)
|
||||
.with_index_no_accessor_name(RawIndexAlgorithm::BTree { columns: 0.into() })
|
||||
.with_access(TableAccess::Public)
|
||||
.finish();
|
||||
b.finish().try_into().expect("valid module v1")
|
||||
};
|
||||
|
||||
// Define the module that we're migrating to.
|
||||
let module_v2: ModuleDef = {
|
||||
let mut b = RawModuleDefV9Builder::new();
|
||||
b.build_table_with_new_type(
|
||||
"seq_t",
|
||||
[("id", AlgebraicType::I64), ("payload", AlgebraicType::U64)],
|
||||
true,
|
||||
)
|
||||
.with_auto_inc_primary_key(0)
|
||||
.with_index_no_accessor_name(btree(0))
|
||||
.with_access(TableAccess::Public)
|
||||
.with_default_column_value(1, product![0u64].into())
|
||||
.finish();
|
||||
b.finish().try_into().expect("valid module v2")
|
||||
};
|
||||
|
||||
// helper to insert + collect sorted ids
|
||||
let insert_and_collect_ids = |stdb: &TestDB, payload: AlgebraicValue| -> anyhow::Result<Vec<i64>> {
|
||||
let mut tx = begin_mut_tx(stdb);
|
||||
let table_id = stdb.table_id_from_name_mut(&tx, "seq_t")?.expect("seq_t should exist");
|
||||
|
||||
insert(stdb, &mut tx, table_id, &payload)?;
|
||||
|
||||
let mut ids = stdb
|
||||
.iter_mut(&tx, table_id)?
|
||||
.map(|r| r.read_col::<i64>(0))
|
||||
.collect::<Result<Vec<_>, _>>()?;
|
||||
|
||||
ids.sort();
|
||||
stdb.commit_tx(tx)?;
|
||||
Ok(ids)
|
||||
};
|
||||
|
||||
// Create the old tables and insert two rows
|
||||
// that use the auto-inc sequence.
|
||||
{
|
||||
let mut tx = begin_mut_tx(&stdb);
|
||||
|
||||
for def in module_v1.tables() {
|
||||
create_table_from_def(&stdb, &mut tx, &module_v1, def)?;
|
||||
}
|
||||
|
||||
let table_id = stdb.table_id_from_name_mut(&tx, "seq_t")?.expect("seq_t should exist");
|
||||
|
||||
insert(&stdb, &mut tx, table_id, &product![0i64])?;
|
||||
insert(&stdb, &mut tx, table_id, &product![0i64])?;
|
||||
|
||||
stdb.commit_tx(tx)?;
|
||||
}
|
||||
|
||||
// Successfully update the database to the new module.
|
||||
{
|
||||
let mut tx = begin_mut_tx(&stdb);
|
||||
|
||||
let plan = ponder_migrate(&module_v1, &module_v2)?;
|
||||
let res = update_database(&stdb, &mut tx, auth_ctx, plan, &TestLogger)?;
|
||||
|
||||
assert!(matches!(
|
||||
res,
|
||||
UpdateResult::Success | UpdateResult::RequiresClientDisconnect
|
||||
));
|
||||
|
||||
stdb.commit_tx(tx)?;
|
||||
}
|
||||
|
||||
// Check that the new table has reused the sequence
|
||||
// from the old table such that the last row has the value 3.
|
||||
{
|
||||
let ids = insert_and_collect_ids(&stdb, product![0i64, 99u64].into())?;
|
||||
assert!(
|
||||
ids.iter().last().unwrap() == &3,
|
||||
"expected id 3 after migration, got {ids:?}"
|
||||
);
|
||||
}
|
||||
|
||||
// Check that we can replay.
|
||||
let stdb = stdb.reopen()?;
|
||||
|
||||
// After replay, the allocation cursor should be preserved.
|
||||
{
|
||||
let ids = insert_and_collect_ids(&stdb, product![0i64, 99u64].into())?;
|
||||
assert!(
|
||||
ids.iter().last().unwrap() == &4097,
|
||||
"expected id 4097 after reopen, got {ids:?}"
|
||||
);
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1015,6 +1015,27 @@ impl MutTxId {
|
||||
Ok(ret)
|
||||
}
|
||||
|
||||
fn update_st_sequence_row<R>(
|
||||
&mut self,
|
||||
sequence_id: SequenceId,
|
||||
updater: impl FnOnce(&mut StSequenceRow) -> R,
|
||||
) -> Result<R> {
|
||||
// Fetch the row.
|
||||
let st_sequence_ref = self
|
||||
.iter_by_col_eq(ST_SEQUENCE_ID, StSequenceFields::SequenceId, &sequence_id.into())?
|
||||
.last()
|
||||
.ok_or(SequenceError::NotFound(sequence_id))?;
|
||||
let ptr = st_sequence_ref.pointer();
|
||||
let mut row = StSequenceRow::try_from(st_sequence_ref)?;
|
||||
|
||||
// Delete the row, run updates, and insert again.
|
||||
self.delete(ST_SEQUENCE_ID, ptr)?;
|
||||
let ret = updater(&mut row);
|
||||
self.insert_via_serialize_bsatn(ST_SEQUENCE_ID, &row)?;
|
||||
|
||||
Ok(ret)
|
||||
}
|
||||
|
||||
pub fn view_id_from_name(&self, view_name: &str) -> Result<Option<ViewId>> {
|
||||
let view_name = &view_name.into();
|
||||
let row = self
|
||||
@@ -1207,19 +1228,20 @@ impl MutTxId {
|
||||
// Store sequence values to restore them later with new table.
|
||||
// Using a map from name to value as the new sequence ids will be different.
|
||||
// and I am not sure if we should rely on the order of sequences in the table schema.
|
||||
let seq_values: HashMap<_, i128> = original_table_schema
|
||||
.sequences
|
||||
.iter()
|
||||
.map(|s| {
|
||||
(
|
||||
s.sequence_name.clone(),
|
||||
self.sequence_state_lock
|
||||
.get_sequence_mut(s.sequence_id)
|
||||
.expect("sequence exists in original schema and should in sequence state.")
|
||||
.get_value(),
|
||||
)
|
||||
})
|
||||
.collect();
|
||||
let mut seq_values: HashMap<_, (i128, i128)> = HashMap::default();
|
||||
for seq in &original_table_schema.sequences {
|
||||
let value = self
|
||||
.sequence_state_lock
|
||||
.get_sequence_mut(seq.sequence_id)
|
||||
.expect("sequence exists in original schema and should in sequence state.")
|
||||
.get_value();
|
||||
let allocated = self
|
||||
.iter_by_col_eq(ST_SEQUENCE_ID, StSequenceFields::SequenceId, &seq.sequence_id.into())?
|
||||
.last()
|
||||
.ok_or(SequenceError::NotFound(seq.sequence_id))?
|
||||
.read_col(StSequenceFields::Allocated)?;
|
||||
seq_values.insert(seq.sequence_name.clone(), (value, allocated));
|
||||
}
|
||||
|
||||
// Drop existing table first due to unique constraints on table name in `st_table`
|
||||
self.drop_table(table_id)?;
|
||||
@@ -1249,23 +1271,40 @@ impl MutTxId {
|
||||
Ok(new_table_id)
|
||||
}
|
||||
|
||||
/// Recreate a table and restore sequence runtime state after a destructive
|
||||
/// schema change (for example `add_columns_to_table`).
|
||||
///
|
||||
/// `create_table(...)` generates fresh table/sequence IDs and inserts fresh
|
||||
/// rows into `st_sequence`. We then restore preserved `(value, allocated)`
|
||||
/// by sequence name:
|
||||
/// - update in-memory sequence state (`SequencesState`) so this process keeps
|
||||
/// allocating from the same point;
|
||||
/// - patch the newly created `st_sequence` row so reopen/replay restores the
|
||||
/// same allocation cursor instead of sequence start.
|
||||
fn create_table_and_update_seq(
|
||||
&mut self,
|
||||
table_schema: TableSchema,
|
||||
seq_values: HashMap<RawIdentifier, i128>,
|
||||
seq_values: HashMap<RawIdentifier, (i128, i128)>,
|
||||
) -> Result<TableId> {
|
||||
let table_id = self.create_table(table_schema)?;
|
||||
let table_schema = self.schema_for_table(table_id)?;
|
||||
|
||||
for seq in table_schema.sequences.iter() {
|
||||
let new_seq = self
|
||||
.sequence_state_lock
|
||||
.get_sequence_mut(seq.sequence_id)
|
||||
.expect("sequence just created");
|
||||
let value = *seq_values
|
||||
let (value, allocated) = *seq_values
|
||||
.get(&seq.sequence_name)
|
||||
.ok_or_else(|| SequenceError::NotFound(seq.sequence_id))?;
|
||||
new_seq.update_value(value);
|
||||
{
|
||||
let new_seq = self
|
||||
.sequence_state_lock
|
||||
.get_sequence_mut(seq.sequence_id)
|
||||
.expect("sequence just created");
|
||||
new_seq.update_value(value);
|
||||
new_seq.update_allocation(allocated);
|
||||
}
|
||||
|
||||
// This updates the new `st_sequence` row created by `create_table(...)`
|
||||
// above (old table rows are already dropped).
|
||||
self.update_st_sequence_row(seq.sequence_id, |st| st.allocated = allocated)?;
|
||||
}
|
||||
|
||||
Ok(table_id)
|
||||
|
||||
@@ -82,6 +82,17 @@ impl Sequence {
|
||||
self.value = new_value;
|
||||
}
|
||||
|
||||
/// Update the persisted allocation cursor for the sequence.
|
||||
pub(super) fn update_allocation(&mut self, new_allocated: i128) {
|
||||
if !(self.schema.min_value..=self.schema.max_value).contains(&new_allocated) {
|
||||
panic!(
|
||||
"Invalid sequence allocation update: new allocated {} is out of bounds for sequence with min_value {} and max_value {}",
|
||||
new_allocated, self.schema.min_value, self.schema.max_value
|
||||
);
|
||||
}
|
||||
self.allocated = new_allocated;
|
||||
}
|
||||
|
||||
pub(super) fn get_value(&self) -> i128 {
|
||||
self.value
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user