mirror of
https://github.com/clockworklabs/SpacetimeDB.git
synced 2026-05-10 09:40:23 -04:00
Insert into accessor system tables
This commit is contained in:
@@ -1313,6 +1313,7 @@ mod tests {
|
||||
columns_to_row_type, ColumnSchema, ConstraintSchema, IndexSchema, RowLevelSecuritySchema, ScheduleSchema,
|
||||
SequenceSchema,
|
||||
};
|
||||
use spacetimedb_schema::table_name::TableName;
|
||||
|
||||
/// For the first user-created table, sequences in the system tables start
|
||||
/// from this value.
|
||||
|
||||
@@ -20,11 +20,13 @@ use crate::{
|
||||
use crate::{
|
||||
error::{IndexError, SequenceError, TableError},
|
||||
system_tables::{
|
||||
with_sys_table_buf, StClientFields, StClientRow, StColumnFields, StColumnRow, StConstraintFields,
|
||||
StConstraintRow, StEventTableRow, StFields as _, StIndexFields, StIndexRow, StRowLevelSecurityFields,
|
||||
StRowLevelSecurityRow, StScheduledFields, StScheduledRow, StSequenceFields, StSequenceRow, StTableFields,
|
||||
StTableRow, SystemTable, ST_CLIENT_ID, ST_COLUMN_ID, ST_CONSTRAINT_ID, ST_EVENT_TABLE_ID, ST_INDEX_ID,
|
||||
ST_ROW_LEVEL_SECURITY_ID, ST_SCHEDULED_ID, ST_SEQUENCE_ID, ST_TABLE_ID,
|
||||
with_sys_table_buf, StClientFields, StClientRow, StColumnAccessorFields, StColumnAccessorRow, StColumnFields,
|
||||
StColumnRow, StConstraintFields, StConstraintRow, StEventTableRow, StFields as _, StIndexAccessorFields,
|
||||
StIndexAccessorRow, StIndexFields, StIndexRow, StRowLevelSecurityFields, StRowLevelSecurityRow,
|
||||
StScheduledFields, StScheduledRow, StSequenceFields, StSequenceRow, StTableAccessorFields, StTableAccessorRow,
|
||||
StTableFields, StTableRow, SystemTable, ST_CLIENT_ID, ST_COLUMN_ACCESSOR_ID, ST_COLUMN_ID, ST_CONSTRAINT_ID,
|
||||
ST_EVENT_TABLE_ID, ST_INDEX_ACCESSOR_ID, ST_INDEX_ID, ST_ROW_LEVEL_SECURITY_ID, ST_SCHEDULED_ID,
|
||||
ST_SEQUENCE_ID, ST_TABLE_ACCESSOR_ID, ST_TABLE_ID,
|
||||
},
|
||||
};
|
||||
use crate::{execution_context::ExecutionContext, system_tables::StViewColumnRow};
|
||||
@@ -59,6 +61,7 @@ use spacetimedb_sats::{
|
||||
};
|
||||
use spacetimedb_schema::{
|
||||
def::{ModuleDef, ViewColumnDef, ViewDef, ViewParamDef},
|
||||
identifier::Identifier,
|
||||
reducer_name::ReducerName,
|
||||
schema::{ColumnSchema, ConstraintSchema, IndexSchema, RowLevelSecuritySchema, SequenceSchema, TableSchema},
|
||||
table_name::TableName,
|
||||
@@ -645,6 +648,7 @@ impl MutTxId {
|
||||
.read_col(StTableFields::TableId)?;
|
||||
|
||||
table_schema.update_table_id(table_id);
|
||||
self.insert_st_table_accessor(&table_name, table_schema.alias.as_ref())?;
|
||||
|
||||
if let Some(info) = table_schema.view_info.as_mut() {
|
||||
info.view_id = self.insert_into_st_view(table_name.clone(), table_id, true, info.is_anonymous)?;
|
||||
@@ -653,7 +657,7 @@ impl MutTxId {
|
||||
// Generate the full definition of the table, with the generated indexes, constraints, sequences...
|
||||
|
||||
// Insert the columns into `st_column`.
|
||||
self.insert_st_column(table_schema.columns())?;
|
||||
self.insert_st_column(&table_name, table_schema.columns())?;
|
||||
|
||||
let schedule = table_schema.schedule.clone();
|
||||
let is_event = table_schema.is_event;
|
||||
@@ -714,15 +718,61 @@ impl MutTxId {
|
||||
Ok(table_id)
|
||||
}
|
||||
|
||||
/// Insert `columns` into `st_column`.
|
||||
fn insert_st_column(&mut self, columns: &[ColumnSchema]) -> Result<()> {
|
||||
/// Insert `columns` into `st_column`, and their accessors into `st_column_accessor`.
|
||||
fn insert_st_column(&mut self, table_name: &TableName, columns: &[ColumnSchema]) -> Result<()> {
|
||||
columns.iter().try_for_each(|col| {
|
||||
let row: StColumnRow = col.clone().into();
|
||||
self.insert_via_serialize_bsatn(ST_COLUMN_ID, &row)?;
|
||||
self.insert_st_column_accessor(table_name, &col.col_name, col.alias.as_ref())?;
|
||||
Ok(())
|
||||
})
|
||||
}
|
||||
|
||||
/// Insert a row into `st_table_accessor` for `table_name`, if an alias is present.
|
||||
fn insert_st_table_accessor(&mut self, table_name: &TableName, alias: Option<&Identifier>) -> Result<()> {
|
||||
let Some(accessor_name) = alias.cloned() else {
|
||||
return Ok(());
|
||||
};
|
||||
let row = StTableAccessorRow {
|
||||
table_name: table_name.clone(),
|
||||
accessor_name,
|
||||
};
|
||||
self.insert_via_serialize_bsatn(ST_TABLE_ACCESSOR_ID, &row)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Insert a row into `st_column_accessor` for `(table_name, col_name)`, if an alias is present.
|
||||
fn insert_st_column_accessor(
|
||||
&mut self,
|
||||
table_name: &TableName,
|
||||
col_name: &Identifier,
|
||||
alias: Option<&Identifier>,
|
||||
) -> Result<()> {
|
||||
let Some(accessor_name) = alias.cloned() else {
|
||||
return Ok(());
|
||||
};
|
||||
let row = StColumnAccessorRow {
|
||||
table_name: table_name.clone(),
|
||||
col_name: col_name.clone(),
|
||||
accessor_name,
|
||||
};
|
||||
self.insert_via_serialize_bsatn(ST_COLUMN_ACCESSOR_ID, &row)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Insert a row into `st_index_accessor` for `index_name`, if an alias is present.
|
||||
fn insert_st_index_accessor(&mut self, index_name: &RawIdentifier, alias: Option<&RawIdentifier>) -> Result<()> {
|
||||
let Some(accessor_name) = alias.cloned() else {
|
||||
return Ok(());
|
||||
};
|
||||
let row = StIndexAccessorRow {
|
||||
index_name: index_name.clone(),
|
||||
accessor_name,
|
||||
};
|
||||
self.insert_via_serialize_bsatn(ST_INDEX_ACCESSOR_ID, &row)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn lookup_st_view(&self, view_id: ViewId) -> Result<StViewRow> {
|
||||
let row = self
|
||||
.iter_by_col_eq(ST_VIEW_ID, StViewFields::ViewId, &view_id.into())?
|
||||
@@ -850,11 +900,33 @@ impl MutTxId {
|
||||
self.delete_col_eq(ST_COLUMN_ID, StColumnFields::TableId.col_id(), &table_id.into())
|
||||
}
|
||||
|
||||
/// Drops rows in `st_column_accessor` for this canonical `table_name`.
|
||||
fn drop_st_column_accessor(&mut self, table_name: &TableName) -> Result<()> {
|
||||
let value = table_name.as_ref().into();
|
||||
self.delete_col_eq(
|
||||
ST_COLUMN_ACCESSOR_ID,
|
||||
StColumnAccessorFields::TableName.col_id(),
|
||||
&value,
|
||||
)
|
||||
}
|
||||
|
||||
/// Drops the row in `st_table` for this `table_id`
|
||||
fn drop_st_table(&mut self, table_id: TableId) -> Result<()> {
|
||||
self.delete_col_eq(ST_TABLE_ID, StTableFields::TableId.col_id(), &table_id.into())
|
||||
}
|
||||
|
||||
/// Drops rows in `st_table_accessor` for this canonical `table_name`.
|
||||
fn drop_st_table_accessor(&mut self, table_name: &TableName) -> Result<()> {
|
||||
let value = table_name.as_ref().into();
|
||||
self.delete_col_eq(ST_TABLE_ACCESSOR_ID, StTableAccessorFields::TableName.col_id(), &value)
|
||||
}
|
||||
|
||||
/// Drops rows in `st_index_accessor` for this canonical `index_name`.
|
||||
fn drop_st_index_accessor(&mut self, index_name: &RawIdentifier) -> Result<()> {
|
||||
let value = index_name.as_ref().into();
|
||||
self.delete_col_eq(ST_INDEX_ACCESSOR_ID, StIndexAccessorFields::IndexName.col_id(), &value)
|
||||
}
|
||||
|
||||
/// Drops the row in `st_view` for this `view_id`
|
||||
fn drop_st_view(&mut self, view_id: ViewId) -> Result<()> {
|
||||
self.delete_col_eq(ST_VIEW_ID, StViewFields::ViewId.col_id(), &view_id.into())
|
||||
@@ -893,6 +965,8 @@ impl MutTxId {
|
||||
}
|
||||
|
||||
// Drop the table and their columns
|
||||
self.drop_st_table_accessor(&schema.table_name)?;
|
||||
self.drop_st_column_accessor(&schema.table_name)?;
|
||||
self.drop_st_table(table_id)?;
|
||||
self.drop_st_column(table_id)?;
|
||||
|
||||
@@ -1045,8 +1119,10 @@ impl MutTxId {
|
||||
// Update system tables.
|
||||
// We'll simply remove all rows in `st_columns` and then add the new ones.
|
||||
// The datastore takes care of not persisting any no-op delete/inserts to the commitlog.
|
||||
let table_name = self.find_st_table_row(table_id)?.table_name;
|
||||
self.drop_st_column(table_id)?;
|
||||
self.insert_st_column(&column_schemas)?;
|
||||
self.drop_st_column_accessor(&table_name)?;
|
||||
self.insert_st_column(&table_name, &column_schemas)?;
|
||||
|
||||
// Remember the pending change so we can undo if necessary.
|
||||
self.push_schema_change(PendingSchemaChange::TableAlterRowType(table_id, old_column_schemas));
|
||||
@@ -1214,6 +1290,7 @@ impl MutTxId {
|
||||
.collapse()
|
||||
.read_col(StIndexFields::IndexId)?;
|
||||
index_schema.index_id = index_id;
|
||||
self.insert_st_index_accessor(&index_schema.index_name, index_schema.alias.as_ref())?;
|
||||
|
||||
// Add the index to the transaction's insert table.
|
||||
let ((table, blob_store, delete_table), (commit_table, commit_blob_store, idx_map)) =
|
||||
@@ -1285,6 +1362,7 @@ impl MutTxId {
|
||||
|
||||
// Remove the index from st_indexes.
|
||||
self.delete(ST_INDEX_ID, st_index_ptr)?;
|
||||
self.drop_st_index_accessor(&st_index_row.index_name)?;
|
||||
|
||||
// Remove the index in the transaction's insert table and the commit table.
|
||||
let ((tx_table, tx_bs, _), (commit_table, commit_bs, idx_map)) =
|
||||
|
||||
@@ -120,7 +120,7 @@ pub trait StateView {
|
||||
// before `migrate_system_tables` creates newer system tables.
|
||||
// We therefore treat a missing `st_column_accessor` as "no aliases yet".
|
||||
//
|
||||
// Note this is different behavior from `find_st_table_accessor_row1`,
|
||||
// Note this is different behavior from `find_st_table_accessor_row`,
|
||||
// because that utility is used for name resolution **after** startup,
|
||||
// where missing accessor tables should be surfaced as real errors.
|
||||
Err(DatastoreError::Table(TableError::IdNotFound(..))) => Ok(None),
|
||||
@@ -227,6 +227,22 @@ pub trait StateView {
|
||||
Err(DatastoreError::Table(TableError::IdNotFound(..))) => false,
|
||||
Err(e) => return Err(e),
|
||||
};
|
||||
// During restore from snapshots produced before `st_table_accessor` existed,
|
||||
// this system table is missing until `migrate_system_tables` runs.
|
||||
// Handle that here so schema reconstruction can proceed during restore.
|
||||
let table_alias = match self.iter_by_col_eq(
|
||||
ST_TABLE_ACCESSOR_ID,
|
||||
StTableAccessorFields::TableName,
|
||||
&table_name.as_ref().into(),
|
||||
) {
|
||||
Ok(mut iter) => iter
|
||||
.next()
|
||||
.map(StTableAccessorRow::try_from)
|
||||
.transpose()?
|
||||
.map(|row| row.accessor_name),
|
||||
Err(DatastoreError::Table(TableError::IdNotFound(..))) => None,
|
||||
Err(e) => return Err(e),
|
||||
};
|
||||
Ok(TableSchema::new(
|
||||
table_id,
|
||||
table_name,
|
||||
@@ -240,8 +256,7 @@ pub trait StateView {
|
||||
schedule,
|
||||
table_primary_key,
|
||||
is_event,
|
||||
//TODO: fetch it from system table
|
||||
None,
|
||||
table_alias,
|
||||
))
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user