mirror of
https://github.com/clockworklabs/SpacetimeDB.git
synced 2026-05-12 10:48:19 -04:00
Add fixup to remove duplicate st_sequence rows for improperly migrated system sequences
This commit is contained in:
@@ -353,6 +353,12 @@ impl RelationalDB {
|
||||
self.database_identity,
|
||||
schema.table_name
|
||||
);
|
||||
// FIXME: `create_table` of tables with sequences
|
||||
// gives a different initial allocation than is given by `CommittedState::bootstrap_system_tables`.
|
||||
// See comment in that method.
|
||||
// This results in requiring `CommittedState::fixup_delete_duplicate_system_sequence_rows`.
|
||||
// Fix `migrate_system_tables` to create new system sequences
|
||||
// with the same initial allocation as `bootstrap_system_tables`.
|
||||
let _ = self.create_table(&mut tx, schema.clone())?;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -16,12 +16,13 @@ use crate::{
|
||||
IterByColRangeTx,
|
||||
},
|
||||
system_tables::{
|
||||
is_built_in_meta_row, system_tables, StColumnRow, StConstraintData, StConstraintRow, StIndexRow, StSequenceRow,
|
||||
StTableFields, StTableRow, StViewRow, SystemTable, ST_CLIENT_ID, ST_CLIENT_IDX, ST_COLUMN_ID, ST_COLUMN_IDX,
|
||||
ST_COLUMN_NAME, ST_CONSTRAINT_ID, ST_CONSTRAINT_IDX, ST_CONSTRAINT_NAME, ST_INDEX_ID, ST_INDEX_IDX,
|
||||
ST_INDEX_NAME, ST_MODULE_ID, ST_MODULE_IDX, ST_ROW_LEVEL_SECURITY_ID, ST_ROW_LEVEL_SECURITY_IDX,
|
||||
ST_SCHEDULED_ID, ST_SCHEDULED_IDX, ST_SEQUENCE_ID, ST_SEQUENCE_IDX, ST_SEQUENCE_NAME, ST_TABLE_ID,
|
||||
ST_TABLE_IDX, ST_VAR_ID, ST_VAR_IDX, ST_VIEW_ARG_ID, ST_VIEW_ARG_IDX,
|
||||
is_built_in_meta_row, system_tables, table_id_is_reserved, StColumnRow, StConstraintData, StConstraintRow,
|
||||
StIndexRow, StSequenceFields, StSequenceRow, StTableFields, StTableRow, StViewRow, SystemTable, ST_CLIENT_ID,
|
||||
ST_CLIENT_IDX, ST_COLUMN_ID, ST_COLUMN_IDX, ST_COLUMN_NAME, ST_CONSTRAINT_ID, ST_CONSTRAINT_IDX,
|
||||
ST_CONSTRAINT_NAME, ST_INDEX_ID, ST_INDEX_IDX, ST_INDEX_NAME, ST_MODULE_ID, ST_MODULE_IDX,
|
||||
ST_ROW_LEVEL_SECURITY_ID, ST_ROW_LEVEL_SECURITY_IDX, ST_SCHEDULED_ID, ST_SCHEDULED_IDX, ST_SEQUENCE_ID,
|
||||
ST_SEQUENCE_IDX, ST_SEQUENCE_NAME, ST_TABLE_ID, ST_TABLE_IDX, ST_VAR_ID, ST_VAR_IDX, ST_VIEW_ARG_ID,
|
||||
ST_VIEW_ARG_IDX,
|
||||
},
|
||||
traits::{EphemeralTables, TxData},
|
||||
};
|
||||
@@ -34,10 +35,10 @@ use crate::{
|
||||
};
|
||||
use anyhow::anyhow;
|
||||
use core::{convert::Infallible, ops::RangeBounds};
|
||||
use spacetimedb_data_structures::map::{HashSet, IntMap, IntSet};
|
||||
use spacetimedb_data_structures::map::{HashMap, HashSet, IntMap, IntSet};
|
||||
use spacetimedb_durability::TxOffset;
|
||||
use spacetimedb_lib::{db::auth::StTableType, Identity};
|
||||
use spacetimedb_primitives::{ColId, ColList, ColSet, IndexId, TableId, ViewId};
|
||||
use spacetimedb_primitives::{ColId, ColList, ColSet, IndexId, SequenceId, TableId, ViewId};
|
||||
use spacetimedb_sats::{algebraic_value::de::ValueDeserializer, memory_usage::MemoryUsage, Deserialize};
|
||||
use spacetimedb_sats::{AlgebraicValue, ProductValue};
|
||||
use spacetimedb_schema::{
|
||||
@@ -201,6 +202,93 @@ impl CommittedState {
|
||||
}
|
||||
}
|
||||
|
||||
/// Delete all but the highest-allocation `st_sequence` row for each system sequence.
|
||||
///
|
||||
/// Prior versions of `RelationalDb::migrate_system_tables` (defined in the `core` crate)
|
||||
/// initialized newly-created system sequences to `allocation: 4097`,
|
||||
/// while `committed_state::bootstrap_system_tables` sets `allocation: 4096`.
|
||||
/// This affected the system table migration which added
|
||||
/// `st_view_view_id_seq` and `st_view_arg_id_seq`.
|
||||
/// As a result, when replaying these databases' commitlogs without a snapshot,
|
||||
/// we will end up with two rows in `st_sequence` for each of these sequences,
|
||||
/// resulting in a unique constraint violation in `CommittedState::build_indexes`.
|
||||
/// We call this method in [`super::datastore::Locking::rebuild_state_after_replay`]
|
||||
/// to avoid that unique constraint violation.
|
||||
pub(super) fn fixup_delete_duplicate_system_sequence_rows(&mut self) {
|
||||
struct StSequenceRowInfo {
|
||||
sequence_id: SequenceId,
|
||||
allocated: i128,
|
||||
row_pointer: RowPointer,
|
||||
}
|
||||
|
||||
// Get all the `st_sequence` rows which refer to sequences on system tables,
|
||||
// including any duplicates caused by the bug described above.
|
||||
let sequence_rows = self
|
||||
.table_scan(ST_SEQUENCE_ID)
|
||||
.expect("`st_sequence` should exist")
|
||||
.filter_map(|row_ref| {
|
||||
// Read the table ID to which the sequence refers,
|
||||
// in order to determine if this is a system sequence or not.
|
||||
let table_id = row_ref
|
||||
.read_col::<TableId>(StSequenceFields::TableId)
|
||||
.expect("`st_sequence` row should conform to `st_sequence` schema");
|
||||
|
||||
// If this sequence refers to a system table, it may need a fixup.
|
||||
// User tables' sequences will never need fixups.
|
||||
table_id_is_reserved(table_id).then(|| {
|
||||
let allocated = row_ref
|
||||
.read_col::<i128>(StSequenceFields::Allocated)
|
||||
.expect("`st_sequence` row should conform to `st_sequence` schema");
|
||||
let sequence_id = row_ref
|
||||
.read_col::<SequenceId>(StSequenceFields::SequenceId)
|
||||
.expect("`st_sequence` row should conform to `st_sequence` schema");
|
||||
StSequenceRowInfo {
|
||||
allocated,
|
||||
sequence_id,
|
||||
row_pointer: row_ref.pointer(),
|
||||
}
|
||||
})
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
let (st_sequence, blob_store, ..) = self
|
||||
.get_table_and_blob_store_mut(ST_SEQUENCE_ID)
|
||||
.expect("`st_sequence` should exist");
|
||||
|
||||
// Track the row with the highest allocation for each sequence.
|
||||
let mut highest_allocations: HashMap<SequenceId, (i128, RowPointer)> = HashMap::default();
|
||||
|
||||
for StSequenceRowInfo {
|
||||
sequence_id,
|
||||
allocated,
|
||||
row_pointer,
|
||||
} in sequence_rows
|
||||
{
|
||||
// For each `st_sequence` row which refers to a system table,
|
||||
// if we've already seen a row for the same sequence,
|
||||
// keep only the row with the higher allocation.
|
||||
if let Some((prev_allocated, prev_row_pointer)) =
|
||||
highest_allocations.insert(sequence_id, (allocated, row_pointer))
|
||||
{
|
||||
// We have a duplicate row. We want to keep whichever has the higher `allocated`,
|
||||
// and delete the other.
|
||||
let row_pointer_to_delete = if prev_allocated > allocated {
|
||||
// The previous row has a higher allocation than the new row,
|
||||
// so delete the new row and restore `previous` to `highest_allocations`.
|
||||
highest_allocations.insert(sequence_id, (prev_allocated, prev_row_pointer));
|
||||
row_pointer
|
||||
} else {
|
||||
// The previous row does not have a higher allocation than the new,
|
||||
// so delete the previous row and keep the new one.
|
||||
prev_row_pointer
|
||||
};
|
||||
|
||||
st_sequence.delete(blob_store, row_pointer_to_delete, |_| ())
|
||||
.expect("Duplicated `st_sequence` row at `row_pointer_to_delete` should be present in `st_sequence` during fixup");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Extremely delicate function to bootstrap the system tables.
|
||||
/// Don't update this unless you know what you're doing.
|
||||
pub(super) fn bootstrap_system_tables(&mut self, database_identity: Identity) -> Result<()> {
|
||||
@@ -442,11 +530,12 @@ impl CommittedState {
|
||||
Err(InsertError::IndexError(e)) => return Err(IndexError::UniqueConstraintViolation(e).into()),
|
||||
};
|
||||
|
||||
let row_ptr = row_ref.pointer();
|
||||
|
||||
if table_id == ST_COLUMN_ID {
|
||||
// We've made a modification to `st_column`.
|
||||
// The type of a table has changed, so figure out which.
|
||||
// The first column in `StColumnRow` is `table_id`.
|
||||
let row_ptr = row_ref.pointer();
|
||||
self.st_column_changed(row, row_ptr)?;
|
||||
}
|
||||
|
||||
|
||||
@@ -141,6 +141,18 @@ impl Locking {
|
||||
/// There may eventually be better way to do this, but this will have to do for now.
|
||||
pub fn rebuild_state_after_replay(&self) -> Result<()> {
|
||||
let mut committed_state = self.committed_state.write_arc();
|
||||
|
||||
// Prior versions of `RelationalDb::migrate_system_tables` (defined in the `core` crate)
|
||||
// initialized newly-created system sequences to `allocation: 4097`,
|
||||
// while `committed_state::bootstrap_system_tables` sets `allocation: 4096`.
|
||||
// This affected the system table migration which added
|
||||
// `st_view_view_id_seq` and `st_view_arg_id_seq`.
|
||||
// As a result, when replaying these databases' commitlogs without a snapshot,
|
||||
// we will end up with two rows in `st_sequence` for each of these sequences,
|
||||
// resulting in a unique constraint violation in `CommittedState::build_indexes`.
|
||||
// We fix this by, for each system sequence, deleting all but the row with the highest allocation.
|
||||
committed_state.fixup_delete_duplicate_system_sequence_rows();
|
||||
|
||||
// `build_missing_tables` must be called before indexes.
|
||||
// Honestly this should maybe just be one big procedure.
|
||||
// See John Carmack's philosophy on this.
|
||||
|
||||
Reference in New Issue
Block a user