Datastore Extraction: Create DatastoreError type (#2798)

Signed-off-by: Tyler Cloutier <cloutiertyler@users.noreply.github.com>
This commit is contained in:
Tyler Cloutier
2025-05-27 23:09:03 -04:00
committed by GitHub
parent afc9e5b73b
commit 71f48ff561
12 changed files with 300 additions and 240 deletions
+161
View File
@@ -0,0 +1,161 @@
use super::system_tables::SystemTable;
use enum_as_inner::EnumAsInner;
use spacetimedb_lib::buffer::DecodeError;
use spacetimedb_lib::{
db::{
error::LibError,
raw_def::{v9::RawSql, RawIndexDefV8},
},
AlgebraicType, AlgebraicValue, ProductValue,
};
use spacetimedb_primitives::{ColId, ColList, IndexId, SequenceId, TableId};
use spacetimedb_sats::{product_value::InvalidFieldError, satn::Satn};
use spacetimedb_snapshot::SnapshotError;
use spacetimedb_table::{
bflatn_to, read_column,
table::{self, ReadViaBsatnError, UniqueConstraintViolation},
};
use thiserror::Error;
#[derive(Error, Debug, EnumAsInner)]
pub enum DatastoreError {
#[error("LibError: {0}")]
Lib(#[from] LibError),
#[error("TableError: {0}")]
Table(#[from] TableError),
#[error("IndexError: {0}")]
Index(#[from] IndexError),
#[error("SequenceError: {0}")]
Sequence(#[from] SequenceError),
#[error(transparent)]
// Box the inner [`SnapshotError`] to keep Clippy quiet about large `Err` variants.
Snapshot(#[from] Box<SnapshotError>),
// TODO(cloutiertyler): should this be a TableError? I couldn't get it to compile
#[error("Error reading a value from a table through BSATN: {0}")]
ReadViaBsatnError(#[from] ReadViaBsatnError),
#[error(transparent)]
Other(#[from] anyhow::Error),
}
#[derive(Error, Debug, EnumAsInner)]
pub enum TableError {
#[error("Table with name `{0}` start with 'st_' and that is reserved for internal system tables.")]
System(Box<str>),
#[error("Table with name `{0}` already exists.")]
Exist(String),
#[error("Table with name `{0}` not found.")]
NotFound(String),
#[error("Table with ID `{1}` not found in `{0}`.")]
IdNotFound(SystemTable, u32),
#[error("Sql `{1}` not found in `{0}`.")]
RawSqlNotFound(SystemTable, RawSql),
#[error("Table with ID `{0}` not found in `TxState`.")]
IdNotFoundState(TableId),
#[error("Column `{0}.{1}` is missing a name")]
ColumnWithoutName(String, ColId),
#[error("schema_for_table: Table has invalid schema: {0} Err: {1}")]
InvalidSchema(TableId, LibError),
#[error("Row has invalid row type for table: {0} Err: {1}", table_id, row.to_satn())]
RowInvalidType { table_id: TableId, row: ProductValue },
#[error("failed to decode row in table")]
RowDecodeError(DecodeError),
#[error("Column with name `{0}` already exists")]
DuplicateColumnName(String),
#[error("Column `{0}` not found")]
ColumnNotFound(ColId),
#[error(
"DecodeError for field `{0}.{1}`, expect `{2}` but found `{3}`",
table,
field,
expect,
found
)]
DecodeField {
table: String,
field: Box<str>,
expect: String,
found: String,
},
#[error(transparent)]
Bflatn(#[from] bflatn_to::Error),
#[error(transparent)]
Duplicate(#[from] table::DuplicateError),
#[error(transparent)]
ReadColTypeError(#[from] read_column::TypeError),
}
#[derive(Error, Debug, PartialEq, Eq)]
pub enum IndexError {
#[error("Index not found: {0:?}")]
NotFound(IndexId),
#[error("Column not found: {0:?}")]
ColumnNotFound(RawIndexDefV8),
#[error(transparent)]
UniqueConstraintViolation(#[from] UniqueConstraintViolation),
#[error("Attempt to define a index with more than 1 auto_inc column: Table: {0:?}, Columns: {1:?}")]
OneAutoInc(TableId, Vec<String>),
#[error("Could not decode arguments to index scan")]
Decode(DecodeError),
#[error("Index was not unique: {0:?}")]
NotUnique(IndexId),
#[error("Key {1:?} was not found in index {0:?}")]
KeyNotFound(IndexId, AlgebraicValue),
}
#[derive(Error, Debug, PartialEq, Eq)]
pub enum SequenceError {
#[error("Sequence with name `{0}` already exists.")]
Exist(String),
#[error("Sequence `{0}`: The increment is 0, and this means the sequence can't advance.")]
IncrementIsZero(String),
#[error("Sequence `{0}`: The min_value {1} must < max_value {2}.")]
MinMax(String, i128, i128),
#[error("Sequence `{0}`: The start value {1} must be >= min_value {2}.")]
MinStart(String, i128, i128),
#[error("Sequence `{0}`: The start value {1} must be <= min_value {2}.")]
MaxStart(String, i128, i128),
#[error("Sequence `{0}` failed to decode value from Sled (not a u128).")]
SequenceValue(String),
#[error("Sequence ID `{0}` not found.")]
NotFound(SequenceId),
#[error("Sequence applied to a non-integer field. Column `{col}` is of type {{found.to_sats()}}.")]
NotInteger { col: String, found: AlgebraicType },
#[error("Sequence ID `{0}` still had no values left after allocation.")]
UnableToAllocate(SequenceId),
#[error("Autoinc constraint on table {0:?} spans more than one column: {1:?}")]
MultiColumnAutoInc(TableId, ColList),
}
impl From<InvalidFieldError> for DatastoreError {
fn from(value: InvalidFieldError) -> Self {
LibError::from(value).into()
}
}
impl From<spacetimedb_table::read_column::TypeError> for DatastoreError {
fn from(err: spacetimedb_table::read_column::TypeError) -> Self {
TableError::from(err).into()
}
}
impl From<table::InsertError> for DatastoreError {
fn from(err: table::InsertError) -> Self {
match err {
table::InsertError::Duplicate(e) => TableError::from(e).into(),
table::InsertError::Bflatn(e) => TableError::from(e).into(),
table::InsertError::IndexError(e) => IndexError::from(e).into(),
}
}
}
impl From<bflatn_to::Error> for DatastoreError {
fn from(err: bflatn_to::Error) -> Self {
Self::Table(err.into())
}
}
impl From<SnapshotError> for DatastoreError {
fn from(e: SnapshotError) -> Self {
DatastoreError::Snapshot(Box::new(e))
}
}
@@ -9,6 +9,7 @@ use super::{
use crate::{
db::{
datastore::{
error::{IndexError, TableError},
system_tables::{
system_tables, StColumnRow, StConstraintData, StConstraintRow, StIndexRow, StSequenceRow,
StTableFields, StTableRow, SystemTable, ST_CLIENT_ID, ST_CLIENT_IDX, ST_COLUMN_ID, ST_COLUMN_IDX,
@@ -21,7 +22,6 @@ use crate::{
},
db_metrics::DB_METRICS,
},
error::{IndexError, TableError},
execution_context::ExecutionContext,
};
use anyhow::anyhow;
@@ -7,6 +7,7 @@ use super::{
tx_state::TxState,
};
use crate::db::datastore::{
error::{DatastoreError, TableError},
locking_tx_datastore::state_view::{IterByColRangeMutTx, IterMutTx, IterTx},
traits::{InsertFlags, UpdateFlags},
};
@@ -25,7 +26,6 @@ use crate::{
},
db_metrics::DB_METRICS,
},
error::{DBError, TableError},
execution_context::ExecutionContext,
};
use anyhow::{anyhow, Context};
@@ -47,7 +47,7 @@ use std::sync::Arc;
use std::time::{Duration, Instant};
use thiserror::Error;
pub type Result<T> = std::result::Result<T, DBError>;
pub type Result<T> = std::result::Result<T, DatastoreError>;
/// Struct contains various database states, each protected by
/// their own lock. To avoid deadlocks, it is crucial to acquire these locks
@@ -634,7 +634,7 @@ impl MutTxDatastore for Locking {
.map(|row| {
let ptr = row.pointer();
let row = StModuleRow::try_from(row)?;
Ok::<_, DBError>((ptr, row))
Ok::<_, DatastoreError>((ptr, row))
})
.transpose()?;
match old {
@@ -893,7 +893,7 @@ pub enum ReplayError {
#[error(transparent)]
Decode(#[from] bsatn::DecodeError),
#[error(transparent)]
Db(#[from] DBError),
Db(#[from] DatastoreError),
#[error(transparent)]
Any(#[from] anyhow::Error),
}
@@ -1137,6 +1137,7 @@ fn metadata_from_row(row: RowRef<'_>) -> Result<Metadata> {
#[cfg(test)]
mod tests {
use super::*;
use crate::db::datastore::error::IndexError;
use crate::db::datastore::locking_tx_datastore::tx_state::PendingSchemaChange;
use crate::db::datastore::system_tables::{
system_tables, StColumnRow, StConstraintData, StConstraintFields, StConstraintRow, StIndexAlgorithm,
@@ -1148,7 +1149,6 @@ mod tests {
};
use crate::db::datastore::traits::{IsolationLevel, MutTx};
use crate::db::datastore::Result;
use crate::error::{DBError, IndexError};
use bsatn::to_vec;
use core::{fmt, mem};
use itertools::Itertools;
@@ -2034,7 +2034,7 @@ mod tests {
insert(&datastore, &mut tx, table_id, &row)?;
let result = insert(&datastore, &mut tx, table_id, &row);
match result {
Err(DBError::Index(IndexError::UniqueConstraintViolation(_))) => (),
Err(DatastoreError::Index(IndexError::UniqueConstraintViolation(_))) => (),
_ => panic!("Expected an unique constraint violation error."),
}
#[rustfmt::skip]
@@ -2051,7 +2051,7 @@ mod tests {
let mut tx = begin_mut_tx(&datastore);
let result = insert(&datastore, &mut tx, table_id, &row);
match result {
Err(DBError::Index(IndexError::UniqueConstraintViolation(_))) => (),
Err(DatastoreError::Index(IndexError::UniqueConstraintViolation(_))) => (),
_ => panic!("Expected an unique constraint violation error."),
}
#[rustfmt::skip]
@@ -2139,7 +2139,7 @@ mod tests {
let row = u32_str_u32(0, "Bar", 18); // 0 will be ignored.
let result = insert(&datastore, &mut tx, table_id, &row);
match result {
Err(DBError::Index(IndexError::UniqueConstraintViolation(_))) => (),
Err(DatastoreError::Index(IndexError::UniqueConstraintViolation(_))) => (),
e => panic!("Expected an unique constraint violation error but got {e:?}."),
}
#[rustfmt::skip]
@@ -2164,7 +2164,7 @@ mod tests {
let row = u32_str_u32(0, "Bar", 18); // 0 will be ignored.
let result = insert(&datastore, &mut tx, table_id, &row);
match result {
Err(DBError::Index(IndexError::UniqueConstraintViolation(_))) => (),
Err(DatastoreError::Index(IndexError::UniqueConstraintViolation(_))) => (),
_ => panic!("Expected an unique constraint violation error."),
}
#[rustfmt::skip]
@@ -8,19 +8,19 @@ use super::{
tx_state::{IndexIdMap, PendingSchemaChange, TxState, TxTableForInsertion},
SharedMutexGuard, SharedWriteGuard,
};
use crate::db::datastore::system_tables::{
with_sys_table_buf, StClientFields, StClientRow, StColumnFields, StColumnRow, StConstraintFields, StConstraintRow,
StFields as _, StIndexFields, StIndexRow, StRowLevelSecurityFields, StRowLevelSecurityRow, StScheduledFields,
StScheduledRow, StSequenceFields, StSequenceRow, StTableFields, StTableRow, SystemTable, ST_CLIENT_ID,
ST_COLUMN_ID, ST_CONSTRAINT_ID, ST_INDEX_ID, ST_ROW_LEVEL_SECURITY_ID, ST_SCHEDULED_ID, ST_SEQUENCE_ID,
ST_TABLE_ID,
};
use crate::db::datastore::traits::{InsertFlags, RowTypeForTable, TxData, UpdateFlags};
use crate::execution_context::Workload;
use crate::{
use crate::db::datastore::{
error::{IndexError, SequenceError, TableError},
execution_context::ExecutionContext,
system_tables::{
with_sys_table_buf, StClientFields, StClientRow, StColumnFields, StColumnRow, StConstraintFields,
StConstraintRow, StFields as _, StIndexFields, StIndexRow, StRowLevelSecurityFields, StRowLevelSecurityRow,
StScheduledFields, StScheduledRow, StSequenceFields, StSequenceRow, StTableFields, StTableRow, SystemTable,
ST_CLIENT_ID, ST_COLUMN_ID, ST_CONSTRAINT_ID, ST_INDEX_ID, ST_ROW_LEVEL_SECURITY_ID, ST_SCHEDULED_ID,
ST_SEQUENCE_ID, ST_TABLE_ID,
},
};
use crate::execution_context::ExecutionContext;
use crate::execution_context::Workload;
use core::ops::RangeBounds;
use core::{cell::RefCell, mem};
use core::{iter, ops::Bound};
@@ -1,12 +1,10 @@
use super::mut_tx::{FilterDeleted, IndexScanRanged};
use super::{committed_state::CommittedState, datastore::Result, tx_state::TxState};
use crate::{
db::datastore::system_tables::{
StColumnFields, StColumnRow, StConstraintFields, StConstraintRow, StIndexFields, StIndexRow, StScheduledFields,
StScheduledRow, StSequenceFields, StSequenceRow, StTableFields, StTableRow, SystemTable, ST_COLUMN_ID,
ST_CONSTRAINT_ID, ST_INDEX_ID, ST_SCHEDULED_ID, ST_SEQUENCE_ID, ST_TABLE_ID,
},
error::TableError,
use crate::db::datastore::error::TableError;
use crate::db::datastore::system_tables::{
StColumnFields, StColumnRow, StConstraintFields, StConstraintRow, StIndexFields, StIndexRow, StScheduledFields,
StScheduledRow, StSequenceFields, StSequenceRow, StTableFields, StTableRow, SystemTable, ST_COLUMN_ID,
ST_CONSTRAINT_ID, ST_INDEX_ID, ST_SCHEDULED_ID, ST_SEQUENCE_ID, ST_TABLE_ID,
};
use core::ops::RangeBounds;
use spacetimedb_primitives::{ColList, TableId};
+3 -2
View File
@@ -1,7 +1,8 @@
pub mod error;
pub mod locking_tx_datastore;
pub mod system_tables;
pub mod traits;
use crate::error::DBError;
use error::DatastoreError;
pub type Result<T> = core::result::Result<T, DBError>;
pub type Result<T> = core::result::Result<T, DatastoreError>;
+23 -22
View File
@@ -11,7 +11,6 @@
//! - Use [`st_fields_enum`] to define its column enum.
//! - Register its schema in [`system_module_def`], making sure to call `validate_system_table` at the end of the function.
use crate::error::DBError;
use spacetimedb_lib::db::auth::{StAccess, StTableType};
use spacetimedb_lib::db::raw_def::v9::{btree, RawSql};
use spacetimedb_lib::db::raw_def::*;
@@ -37,6 +36,8 @@ use std::str::FromStr;
use strum::Display;
use v9::{RawModuleDefV9Builder, TableType};
use super::error::DatastoreError;
/// The static ID of the table that defines tables
pub(crate) const ST_TABLE_ID: TableId = TableId(1);
/// The static ID of the table that defines columns
@@ -490,8 +491,8 @@ pub struct StTableRow {
}
impl TryFrom<RowRef<'_>> for StTableRow {
type Error = DBError;
fn try_from(row: RowRef<'_>) -> Result<Self, DBError> {
type Error = DatastoreError;
fn try_from(row: RowRef<'_>) -> Result<Self, DatastoreError> {
read_via_bsatn(row)
}
}
@@ -544,8 +545,8 @@ pub struct StColumnRow {
}
impl TryFrom<RowRef<'_>> for StColumnRow {
type Error = DBError;
fn try_from(row: RowRef<'_>) -> Result<Self, DBError> {
type Error = DatastoreError;
fn try_from(row: RowRef<'_>) -> Result<Self, DatastoreError> {
read_via_bsatn(row)
}
}
@@ -622,8 +623,8 @@ impl From<StIndexAlgorithm> for IndexAlgorithm {
}
impl TryFrom<RowRef<'_>> for StIndexRow {
type Error = DBError;
fn try_from(row: RowRef<'_>) -> Result<Self, DBError> {
type Error = DatastoreError;
fn try_from(row: RowRef<'_>) -> Result<Self, DatastoreError> {
read_via_bsatn(row)
}
}
@@ -676,8 +677,8 @@ pub struct StSequenceRow {
}
impl TryFrom<RowRef<'_>> for StSequenceRow {
type Error = DBError;
fn try_from(row: RowRef<'_>) -> Result<Self, DBError> {
type Error = DatastoreError;
fn try_from(row: RowRef<'_>) -> Result<Self, DatastoreError> {
read_via_bsatn(row)
}
}
@@ -745,8 +746,8 @@ impl From<ConstraintData> for StConstraintData {
}
impl TryFrom<RowRef<'_>> for StConstraintRow {
type Error = DBError;
fn try_from(row: RowRef<'_>) -> Result<Self, DBError> {
type Error = DatastoreError;
fn try_from(row: RowRef<'_>) -> Result<Self, DatastoreError> {
read_via_bsatn(row)
}
}
@@ -784,8 +785,8 @@ pub struct StRowLevelSecurityRow {
}
impl TryFrom<RowRef<'_>> for StRowLevelSecurityRow {
type Error = DBError;
fn try_from(row: RowRef<'_>) -> Result<Self, DBError> {
type Error = DatastoreError;
fn try_from(row: RowRef<'_>) -> Result<Self, DatastoreError> {
read_via_bsatn(row)
}
}
@@ -871,7 +872,7 @@ pub struct StModuleRow {
}
/// Read bytes directly from the column `col` in `row`.
pub fn read_bytes_from_col(row: RowRef<'_>, col: impl StFields) -> Result<Box<[u8]>, DBError> {
pub fn read_bytes_from_col(row: RowRef<'_>, col: impl StFields) -> Result<Box<[u8]>, DatastoreError> {
let bytes = row.read_col::<ArrayValue>(col.col_id())?;
if let ArrayValue::U8(bytes) = bytes {
Ok(bytes)
@@ -887,19 +888,19 @@ pub fn read_bytes_from_col(row: RowRef<'_>, col: impl StFields) -> Result<Box<[u
/// Read an [`Identity`] directly from the column `col` in `row`.
///
/// The [`Identity`] is assumed to be stored as a flat byte array.
pub fn read_identity_from_col(row: RowRef<'_>, col: impl StFields) -> Result<Identity, DBError> {
pub fn read_identity_from_col(row: RowRef<'_>, col: impl StFields) -> Result<Identity, DatastoreError> {
Ok(Identity::from_u256(row.read_col(col.col_id())?))
}
/// Read a [`Hash`] directly from the column `col` in `row`.
///
/// The [`Hash`] is assumed to be stored as a flat byte array.
pub fn read_hash_from_col(row: RowRef<'_>, col: impl StFields) -> Result<Hash, DBError> {
pub fn read_hash_from_col(row: RowRef<'_>, col: impl StFields) -> Result<Hash, DatastoreError> {
Ok(Hash::from_u256(row.read_col(col.col_id())?))
}
impl TryFrom<RowRef<'_>> for StModuleRow {
type Error = DBError;
type Error = DatastoreError;
fn try_from(row: RowRef<'_>) -> Result<Self, Self::Error> {
read_via_bsatn(row)
@@ -936,7 +937,7 @@ impl From<&StClientRow> for ProductValue {
}
impl TryFrom<RowRef<'_>> for StClientRow {
type Error = DBError;
type Error = DatastoreError;
fn try_from(row: RowRef<'_>) -> Result<Self, Self::Error> {
read_via_bsatn(row)
@@ -1036,7 +1037,7 @@ impl StVarName {
}
impl TryFrom<RowRef<'_>> for StVarRow {
type Error = DBError;
type Error = DatastoreError;
fn try_from(row: RowRef<'_>) -> Result<Self, Self::Error> {
// The position of the `value` column in `st_var`
@@ -1073,8 +1074,8 @@ pub struct StScheduledRow {
}
impl TryFrom<RowRef<'_>> for StScheduledRow {
type Error = DBError;
fn try_from(row: RowRef<'_>) -> Result<Self, DBError> {
type Error = DatastoreError;
fn try_from(row: RowRef<'_>) -> Result<Self, DatastoreError> {
read_via_bsatn(row)
}
}
@@ -1110,7 +1111,7 @@ pub(crate) fn with_sys_table_buf<R>(run: impl FnOnce(&mut Vec<u8>) -> R) -> R {
}
/// Read a value from a system table via BSATN.
fn read_via_bsatn<T: DeserializeOwned>(row: RowRef<'_>) -> Result<T, DBError> {
fn read_via_bsatn<T: DeserializeOwned>(row: RowRef<'_>) -> Result<T, DatastoreError> {
with_sys_table_buf(|buf| Ok(row.read_via_bsatn::<T>(buf)?))
}
+53 -45
View File
@@ -1,3 +1,4 @@
use super::datastore::error::{DatastoreError, TableError};
use super::datastore::locking_tx_datastore::committed_state::CommittedState;
use super::datastore::locking_tx_datastore::datastore::TxMetrics;
use super::datastore::locking_tx_datastore::state_view::{
@@ -17,7 +18,7 @@ use super::datastore::{
};
use super::db_metrics::DB_METRICS;
use crate::db::datastore::system_tables::{StModuleRow, WASM_MODULE};
use crate::error::{DBError, DatabaseError, RestoreSnapshotError, TableError};
use crate::error::{DBError, DatabaseError, RestoreSnapshotError};
use crate::execution_context::{ReducerContext, Workload, WorkloadType};
use crate::messages::control_db::HostType;
use crate::subscription::ExecutionCounters;
@@ -433,14 +434,14 @@ impl RelationalDB {
program_bytes: program.bytes,
module_version: ONLY_MODULE_VERSION.into(),
};
tx.insert_via_serialize_bsatn(ST_MODULE_ID, &row).map(drop)
Ok(tx.insert_via_serialize_bsatn(ST_MODULE_ID, &row).map(drop)?)
}
/// Obtain the [`Metadata`] of this database.
///
/// `None` if the database is not yet fully initialized.
pub fn metadata(&self) -> Result<Option<Metadata>, DBError> {
self.with_read_only(Workload::Internal, |tx| self.inner.metadata(tx))
Ok(self.with_read_only(Workload::Internal, |tx| self.inner.metadata(tx))?)
}
/// Obtain the module associated with this database.
@@ -448,12 +449,17 @@ impl RelationalDB {
/// `None` if the database is not yet fully initialized.
/// Note that a `Some` result may yield an empty slice.
pub fn program(&self) -> Result<Option<Program>, DBError> {
self.with_read_only(Workload::Internal, |tx| self.inner.program(tx))
Ok(self.with_read_only(Workload::Internal, |tx| self.inner.program(tx))?)
}
/// Read the set of clients currently connected to the database.
pub fn connected_clients(&self) -> Result<ConnectedClients, DBError> {
self.with_read_only(Workload::Internal, |tx| self.inner.connected_clients(tx)?.collect())
self.with_read_only(Workload::Internal, |tx| {
self.inner
.connected_clients(tx)?
.collect::<Result<ConnectedClients, _>>()
})
.map_err(DBError::from)
}
/// Update the module associated with this database.
@@ -470,7 +476,7 @@ impl RelationalDB {
let program_kind = match host_type {
HostType::Wasm => WASM_MODULE,
};
self.inner.update_program(tx, program_kind, program)
Ok(self.inner.update_program(tx, program_kind, program)?)
}
fn restore_from_snapshot_or_bootstrap(
@@ -527,6 +533,7 @@ impl RelationalDB {
e
)
})
.map_err(DBError::from)
.map_err(Box::new)
}
@@ -625,6 +632,7 @@ impl RelationalDB {
}
Locking::bootstrap(database_identity, page_pool)
.map_err(DBError::from)
.map_err(Box::new)
.map_err(RestoreSnapshotError::Bootstrap)
}
@@ -685,11 +693,11 @@ impl RelationalDB {
}
pub fn schema_for_table_mut(&self, tx: &MutTx, table_id: TableId) -> Result<Arc<TableSchema>, DBError> {
self.inner.schema_for_table_mut_tx(tx, table_id)
Ok(self.inner.schema_for_table_mut_tx(tx, table_id)?)
}
pub fn schema_for_table(&self, tx: &Tx, table_id: TableId) -> Result<Arc<TableSchema>, DBError> {
self.inner.schema_for_table_tx(tx, table_id)
Ok(self.inner.schema_for_table_tx(tx, table_id)?)
}
pub fn row_schema_for_table<'tx>(
@@ -697,15 +705,15 @@ impl RelationalDB {
tx: &'tx MutTx,
table_id: TableId,
) -> Result<RowTypeForTable<'tx>, DBError> {
self.inner.row_type_for_table_mut_tx(tx, table_id)
Ok(self.inner.row_type_for_table_mut_tx(tx, table_id)?)
}
pub fn get_all_tables_mut(&self, tx: &MutTx) -> Result<Vec<Arc<TableSchema>>, DBError> {
self.inner.get_all_tables_mut_tx(tx)
Ok(self.inner.get_all_tables_mut_tx(tx)?)
}
pub fn get_all_tables(&self, tx: &Tx) -> Result<Vec<Arc<TableSchema>>, DBError> {
self.inner.get_all_tables_tx(tx)
Ok(self.inner.get_all_tables_tx(tx)?)
}
pub fn table_scheduled_id_and_at(
@@ -734,7 +742,7 @@ impl RelationalDB {
let check_bounds = |schema: &ProductType| -> Result<_, DBError> {
let col_idx = col_id.idx();
if col_idx >= schema.elements.len() {
return Err(TableError::ColumnNotFound(col_id).into());
return Err(DatastoreError::Table(TableError::ColumnNotFound(col_id)).into());
}
Ok(col_idx)
};
@@ -1022,7 +1030,7 @@ impl RelationalDB {
}
pub(crate) fn alter_table_access(&self, tx: &mut MutTx, name: Box<str>, access: StAccess) -> Result<(), DBError> {
self.inner.alter_table_access_mut_tx(tx, name, access)
Ok(self.inner.alter_table_access_mut_tx(tx, name, access)?)
}
/// Reports the `TxMetrics`s passed.
@@ -1044,7 +1052,7 @@ impl RelationalDB {
impl RelationalDB {
pub fn create_table(&self, tx: &mut MutTx, schema: TableSchema) -> Result<TableId, DBError> {
self.inner.create_table_mut_tx(tx, schema)
Ok(self.inner.create_table_mut_tx(tx, schema)?)
}
pub fn create_table_for_test_with_the_works(
@@ -1129,12 +1137,12 @@ impl RelationalDB {
.table_name_from_id_mut(tx, table_id)?
.map(|name| name.to_string())
.unwrap_or_default();
self.inner.drop_table_mut_tx(tx, table_id).map(|_| {
Ok(self.inner.drop_table_mut_tx(tx, table_id).map(|_| {
DB_METRICS
.rdb_num_table_rows
.with_label_values(&self.database_identity, &table_id.into(), &table_name)
.set(0)
})
})?)
}
/// Rename a table.
@@ -1144,15 +1152,15 @@ impl RelationalDB {
///
/// If the table is not found or is a system table, an error is returned.
pub fn rename_table(&self, tx: &mut MutTx, table_id: TableId, new_name: &str) -> Result<(), DBError> {
self.inner.rename_table_mut_tx(tx, table_id, new_name)
Ok(self.inner.rename_table_mut_tx(tx, table_id, new_name)?)
}
pub fn table_id_from_name_mut(&self, tx: &MutTx, table_name: &str) -> Result<Option<TableId>, DBError> {
self.inner.table_id_from_name_mut_tx(tx, table_name)
Ok(self.inner.table_id_from_name_mut_tx(tx, table_name)?)
}
pub fn table_id_from_name(&self, tx: &Tx, table_name: &str) -> Result<Option<TableId>, DBError> {
self.inner.table_id_from_name_tx(tx, table_name)
Ok(self.inner.table_id_from_name_tx(tx, table_name)?)
}
pub fn table_id_exists(&self, tx: &Tx, table_id: &TableId) -> bool {
@@ -1164,7 +1172,7 @@ impl RelationalDB {
}
pub fn table_name_from_id<'a>(&'a self, tx: &'a Tx, table_id: TableId) -> Result<Option<Cow<'a, str>>, DBError> {
self.inner.table_name_from_id_tx(tx, table_id)
Ok(self.inner.table_name_from_id_tx(tx, table_id)?)
}
pub fn table_name_from_id_mut<'a>(
@@ -1172,11 +1180,11 @@ impl RelationalDB {
tx: &'a MutTx,
table_id: TableId,
) -> Result<Option<Cow<'a, str>>, DBError> {
self.inner.table_name_from_id_mut_tx(tx, table_id)
Ok(self.inner.table_name_from_id_mut_tx(tx, table_id)?)
}
pub fn index_id_from_name_mut(&self, tx: &MutTx, index_name: &str) -> Result<Option<IndexId>, DBError> {
self.inner.index_id_from_name_mut_tx(tx, index_name)
Ok(self.inner.index_id_from_name_mut_tx(tx, index_name)?)
}
pub fn table_row_count_mut(&self, tx: &MutTx, table_id: TableId) -> Option<u64> {
@@ -1212,27 +1220,27 @@ impl RelationalDB {
}
pub fn index_id_from_name(&self, tx: &MutTx, index_name: &str) -> Result<Option<IndexId>, DBError> {
self.inner.index_id_from_name_mut_tx(tx, index_name)
Ok(self.inner.index_id_from_name_mut_tx(tx, index_name)?)
}
pub fn sequence_id_from_name(&self, tx: &MutTx, sequence_name: &str) -> Result<Option<SequenceId>, DBError> {
self.inner.sequence_id_from_name_mut_tx(tx, sequence_name)
Ok(self.inner.sequence_id_from_name_mut_tx(tx, sequence_name)?)
}
pub fn constraint_id_from_name(&self, tx: &MutTx, constraint_name: &str) -> Result<Option<ConstraintId>, DBError> {
self.inner.constraint_id_from_name(tx, constraint_name)
Ok(self.inner.constraint_id_from_name(tx, constraint_name)?)
}
/// Adds the index into the [ST_INDEXES_NAME] table
///
/// NOTE: It loads the data from the table into it before returning
pub fn create_index(&self, tx: &mut MutTx, schema: IndexSchema, is_unique: bool) -> Result<IndexId, DBError> {
self.inner.create_index_mut_tx(tx, schema, is_unique)
Ok(self.inner.create_index_mut_tx(tx, schema, is_unique)?)
}
/// Removes the [`TableIndex`] from the database by their `index_id`
pub fn drop_index(&self, tx: &mut MutTx, index_id: IndexId) -> Result<(), DBError> {
self.inner.drop_index_mut_tx(tx, index_id)
Ok(self.inner.drop_index_mut_tx(tx, index_id)?)
}
pub fn create_row_level_security(
@@ -1240,11 +1248,11 @@ impl RelationalDB {
tx: &mut MutTx,
row_level_security_schema: RowLevelSecuritySchema,
) -> Result<RawSql, DBError> {
tx.create_row_level_security(row_level_security_schema)
Ok(tx.create_row_level_security(row_level_security_schema)?)
}
pub fn drop_row_level_security(&self, tx: &mut MutTx, sql: RawSql) -> Result<(), DBError> {
tx.drop_row_level_security(sql)
Ok(tx.drop_row_level_security(sql)?)
}
pub fn row_level_security_for_table_id_mut_tx(
@@ -1252,17 +1260,17 @@ impl RelationalDB {
tx: &mut MutTx,
table_id: TableId,
) -> Result<Vec<RowLevelSecuritySchema>, DBError> {
tx.row_level_security_for_table_id(table_id)
Ok(tx.row_level_security_for_table_id(table_id)?)
}
/// Returns an iterator,
/// yielding every row in the table identified by `table_id`.
pub fn iter_mut<'a>(&'a self, tx: &'a MutTx, table_id: TableId) -> Result<IterMutTx<'a>, DBError> {
self.inner.iter_mut_tx(tx, table_id)
Ok(self.inner.iter_mut_tx(tx, table_id)?)
}
pub fn iter<'a>(&'a self, tx: &'a Tx, table_id: TableId) -> Result<IterTx<'a>, DBError> {
self.inner.iter_tx(tx, table_id)
Ok(self.inner.iter_tx(tx, table_id)?)
}
/// Returns an iterator,
@@ -1277,7 +1285,7 @@ impl RelationalDB {
cols: impl Into<ColList>,
value: &'r AlgebraicValue,
) -> Result<IterByColEqMutTx<'a, 'r>, DBError> {
self.inner.iter_by_col_eq_mut_tx(tx, table_id.into(), cols, value)
Ok(self.inner.iter_by_col_eq_mut_tx(tx, table_id.into(), cols, value)?)
}
pub fn iter_by_col_eq<'a, 'r>(
@@ -1287,7 +1295,7 @@ impl RelationalDB {
cols: impl Into<ColList>,
value: &'r AlgebraicValue,
) -> Result<IterByColEqTx<'a, 'r>, DBError> {
self.inner.iter_by_col_eq_tx(tx, table_id.into(), cols, value)
Ok(self.inner.iter_by_col_eq_tx(tx, table_id.into(), cols, value)?)
}
/// Returns an iterator,
@@ -1302,7 +1310,7 @@ impl RelationalDB {
cols: impl Into<ColList>,
range: R,
) -> Result<IterByColRangeMutTx<'a, R>, DBError> {
self.inner.iter_by_col_range_mut_tx(tx, table_id.into(), cols, range)
Ok(self.inner.iter_by_col_range_mut_tx(tx, table_id.into(), cols, range)?)
}
/// Returns an iterator,
@@ -1317,7 +1325,7 @@ impl RelationalDB {
cols: impl Into<ColList>,
range: R,
) -> Result<IterByColRangeTx<'a, R>, DBError> {
self.inner.iter_by_col_range_tx(tx, table_id.into(), cols, range)
Ok(self.inner.iter_by_col_range_tx(tx, table_id.into(), cols, range)?)
}
pub fn index_scan_range<'a>(
@@ -1329,7 +1337,7 @@ impl RelationalDB {
rstart: &[u8],
rend: &[u8],
) -> Result<(TableId, impl Iterator<Item = RowRef<'a>>), DBError> {
tx.index_scan_range(index_id, prefix, prefix_elems, rstart, rend)
Ok(tx.index_scan_range(index_id, prefix, prefix_elems, rstart, rend)?)
}
pub fn insert<'a>(
@@ -1338,7 +1346,7 @@ impl RelationalDB {
table_id: TableId,
row: &[u8],
) -> Result<(ColList, RowRef<'a>, InsertFlags), DBError> {
self.inner.insert_mut_tx(tx, table_id, row)
Ok(self.inner.insert_mut_tx(tx, table_id, row)?)
}
pub fn update<'a>(
@@ -1348,7 +1356,7 @@ impl RelationalDB {
index_id: IndexId,
row: &[u8],
) -> Result<(ColList, RowRef<'a>, UpdateFlags), DBError> {
self.inner.update_mut_tx(tx, table_id, index_id, row)
Ok(self.inner.update_mut_tx(tx, table_id, index_id, row)?)
}
pub fn delete(&self, tx: &mut MutTx, table_id: TableId, row_ids: impl IntoIterator<Item = RowPointer>) -> u32 {
@@ -1375,17 +1383,17 @@ impl RelationalDB {
}
pub fn create_sequence(&self, tx: &mut MutTx, sequence_schema: SequenceSchema) -> Result<SequenceId, DBError> {
self.inner.create_sequence_mut_tx(tx, sequence_schema)
Ok(self.inner.create_sequence_mut_tx(tx, sequence_schema)?)
}
///Removes the [Sequence] from database instance
pub fn drop_sequence(&self, tx: &mut MutTx, seq_id: SequenceId) -> Result<(), DBError> {
self.inner.drop_sequence_mut_tx(tx, seq_id)
Ok(self.inner.drop_sequence_mut_tx(tx, seq_id)?)
}
///Removes the [Constraints] from database instance
pub fn drop_constraint(&self, tx: &mut MutTx, constraint_id: ConstraintId) -> Result<(), DBError> {
self.inner.drop_constraint_mut_tx(tx, constraint_id)
Ok(self.inner.drop_constraint_mut_tx(tx, constraint_id)?)
}
/// Reports the metrics for `reducer`, using counters provided by `db`.
@@ -1905,7 +1913,7 @@ pub mod tests_utils {
}
pub fn take_snapshot(&self, repo: &SnapshotRepository) -> Result<Option<SnapshotDirPath>, DBError> {
self.inner.take_snapshot(repo)
Ok(self.inner.take_snapshot(repo)?)
}
}
@@ -2020,6 +2028,7 @@ mod tests {
use super::tests_utils::begin_mut_tx;
use super::*;
use crate::db::datastore::error::{DatastoreError, IndexError};
use crate::db::datastore::system_tables::{
system_tables, StConstraintRow, StIndexRow, StSequenceRow, StTableRow, ST_CONSTRAINT_ID, ST_INDEX_ID,
ST_SEQUENCE_ID, ST_TABLE_ID,
@@ -2027,7 +2036,6 @@ mod tests {
use crate::db::relational_db::tests_utils::{
begin_tx, insert, make_snapshot, with_auto_commit, with_read_only, TestDB,
};
use crate::error::IndexError;
use crate::execution_context::ReducerContext;
use anyhow::bail;
use bytes::Bytes;
@@ -2476,7 +2484,7 @@ mod tests {
insert(&stdb, &mut tx, table_id, &product![1i64, 0i64]).expect("stdb.insert failed");
match insert(&stdb, &mut tx, table_id, &product![1i64, 1i64]) {
Ok(_) => panic!("Allow to insert duplicate row"),
Err(DBError::Index(IndexError::UniqueConstraintViolation { .. })) => {}
Err(DBError::Datastore(DatastoreError::Index(IndexError::UniqueConstraintViolation { .. }))) => {}
Err(err) => panic!("Expected error `UniqueConstraintViolation`, got {err}"),
}
+15 -131
View File
@@ -8,93 +8,23 @@ use hex::FromHexError;
use spacetimedb_commitlog::repo::TxOffset;
use spacetimedb_expr::errors::TypingError;
use spacetimedb_lib::Identity;
use spacetimedb_sats::AlgebraicType;
use spacetimedb_schema::error::ValidationErrors;
use spacetimedb_snapshot::SnapshotError;
use spacetimedb_table::table::{self, ReadViaBsatnError, UniqueConstraintViolation};
use spacetimedb_table::{bflatn_to, read_column};
use spacetimedb_table::table::ReadViaBsatnError;
use thiserror::Error;
use crate::client::ClientActorId;
use crate::db::datastore::system_tables::SystemTable;
use crate::host::scheduler::ScheduleError;
use spacetimedb_lib::buffer::DecodeError;
use spacetimedb_lib::db::error::{LibError, RelationError, SchemaErrors};
use spacetimedb_lib::db::raw_def::v9::RawSql;
use spacetimedb_lib::db::raw_def::RawIndexDefV8;
use spacetimedb_lib::relation::FieldName;
use spacetimedb_primitives::*;
use spacetimedb_sats::hash::Hash;
use spacetimedb_sats::product_value::InvalidFieldError;
use spacetimedb_sats::satn::Satn;
use spacetimedb_sats::{AlgebraicValue, ProductValue};
use spacetimedb_vm::errors::{ErrorKind, ErrorLang, ErrorType, ErrorVm};
use spacetimedb_vm::expr::Crud;
#[derive(Error, Debug, EnumAsInner)]
pub enum TableError {
#[error("Table with name `{0}` start with 'st_' and that is reserved for internal system tables.")]
System(Box<str>),
#[error("Table with name `{0}` already exists.")]
Exist(String),
#[error("Table with name `{0}` not found.")]
NotFound(String),
#[error("Table with ID `{1}` not found in `{0}`.")]
IdNotFound(SystemTable, u32),
#[error("Sql `{1}` not found in `{0}`.")]
RawSqlNotFound(SystemTable, RawSql),
#[error("Table with ID `{0}` not found in `TxState`.")]
IdNotFoundState(TableId),
#[error("Column `{0}.{1}` is missing a name")]
ColumnWithoutName(String, ColId),
#[error("schema_for_table: Table has invalid schema: {0} Err: {1}")]
InvalidSchema(TableId, LibError),
#[error("Row has invalid row type for table: {0} Err: {1}", table_id, row.to_satn())]
RowInvalidType { table_id: TableId, row: ProductValue },
#[error("failed to decode row in table")]
RowDecodeError(DecodeError),
#[error("Column with name `{0}` already exists")]
DuplicateColumnName(String),
#[error("Column `{0}` not found")]
ColumnNotFound(ColId),
#[error(
"DecodeError for field `{0}.{1}`, expect `{2}` but found `{3}`",
table,
field,
expect,
found
)]
DecodeField {
table: String,
field: Box<str>,
expect: String,
found: String,
},
#[error(transparent)]
Bflatn(#[from] bflatn_to::Error),
#[error(transparent)]
Duplicate(#[from] table::DuplicateError),
#[error(transparent)]
ReadColTypeError(#[from] read_column::TypeError),
}
#[derive(Error, Debug, PartialEq, Eq)]
pub enum IndexError {
#[error("Index not found: {0:?}")]
NotFound(IndexId),
#[error("Column not found: {0:?}")]
ColumnNotFound(RawIndexDefV8),
#[error(transparent)]
UniqueConstraintViolation(#[from] UniqueConstraintViolation),
#[error("Attempt to define a index with more than 1 auto_inc column: Table: {0:?}, Columns: {1:?}")]
OneAutoInc(TableId, Vec<String>),
#[error("Could not decode arguments to index scan")]
Decode(DecodeError),
#[error("Index was not unique: {0:?}")]
NotUnique(IndexId),
#[error("Key {1:?} was not found in index {0:?}")]
KeyNotFound(IndexId, AlgebraicValue),
}
pub use crate::db::datastore::error::{DatastoreError, IndexError, SequenceError, TableError};
#[derive(Error, Debug, PartialEq, Eq)]
pub enum ClientError {
@@ -152,42 +82,16 @@ pub enum DatabaseError {
DatabasedOpened(PathBuf, anyhow::Error),
}
#[derive(Error, Debug, PartialEq, Eq)]
pub enum SequenceError {
#[error("Sequence with name `{0}` already exists.")]
Exist(String),
#[error("Sequence `{0}`: The increment is 0, and this means the sequence can't advance.")]
IncrementIsZero(String),
#[error("Sequence `{0}`: The min_value {1} must < max_value {2}.")]
MinMax(String, i128, i128),
#[error("Sequence `{0}`: The start value {1} must be >= min_value {2}.")]
MinStart(String, i128, i128),
#[error("Sequence `{0}`: The start value {1} must be <= min_value {2}.")]
MaxStart(String, i128, i128),
#[error("Sequence `{0}` failed to decode value from Sled (not a u128).")]
SequenceValue(String),
#[error("Sequence ID `{0}` not found.")]
NotFound(SequenceId),
#[error("Sequence applied to a non-integer field. Column `{col}` is of type {{found.to_sats()}}.")]
NotInteger { col: String, found: AlgebraicType },
#[error("Sequence ID `{0}` still had no values left after allocation.")]
UnableToAllocate(SequenceId),
#[error("Autoinc constraint on table {0:?} spans more than one column: {1:?}")]
MultiColumnAutoInc(TableId, ColList),
}
#[derive(Error, Debug, EnumAsInner)]
pub enum DBError {
#[error("LibError: {0}")]
Lib(#[from] LibError),
#[error("BufferError: {0}")]
Buffer(#[from] DecodeError),
#[error("TableError: {0}")]
Table(#[from] TableError),
#[error("DatastoreError: {0}")]
Datastore(#[from] DatastoreError),
#[error("SequenceError: {0}")]
Sequence2(#[from] SequenceError),
#[error("IndexError: {0}")]
Index(#[from] IndexError),
#[error("SchemaError: {0}")]
Schema(SchemaErrors),
#[error("IOError: {0}.")]
@@ -242,28 +146,6 @@ pub enum DBError {
RestoreSnapshot(#[from] RestoreSnapshotError),
}
impl From<bflatn_to::Error> for DBError {
fn from(err: bflatn_to::Error) -> Self {
Self::Table(err.into())
}
}
impl From<table::InsertError> for DBError {
fn from(err: table::InsertError) -> Self {
match err {
table::InsertError::Duplicate(e) => TableError::from(e).into(),
table::InsertError::Bflatn(e) => TableError::from(e).into(),
table::InsertError::IndexError(e) => IndexError::from(e).into(),
}
}
}
impl From<SnapshotError> for DBError {
fn from(e: SnapshotError) -> Self {
DBError::Snapshot(Box::new(e))
}
}
impl DBError {
pub fn get_auth_error(&self) -> Option<&ErrorLang> {
if let Self::VmUser(err) = self {
@@ -289,7 +171,7 @@ impl From<InvalidFieldError> for DBError {
impl From<spacetimedb_table::read_column::TypeError> for DBError {
fn from(err: spacetimedb_table::read_column::TypeError) -> Self {
TableError::from(err).into()
DatastoreError::Table(TableError::from(err)).into()
}
}
@@ -395,14 +277,16 @@ pub enum NodesError {
impl From<DBError> for NodesError {
fn from(e: DBError) -> Self {
match e {
DBError::Table(TableError::Exist(name)) => Self::AlreadyExists(name),
DBError::Table(TableError::System(name)) => Self::SystemName(name),
DBError::Table(TableError::IdNotFound(_, _) | TableError::NotFound(_)) => Self::TableNotFound,
DBError::Table(TableError::ColumnNotFound(_)) => Self::BadColumn,
DBError::Index(IndexError::NotFound(_)) => Self::IndexNotFound,
DBError::Index(IndexError::Decode(e)) => Self::DecodeRow(e),
DBError::Index(IndexError::NotUnique(_)) => Self::IndexNotUnique,
DBError::Index(IndexError::KeyNotFound(..)) => Self::IndexRowNotFound,
DBError::Datastore(DatastoreError::Table(TableError::Exist(name))) => Self::AlreadyExists(name),
DBError::Datastore(DatastoreError::Table(TableError::System(name))) => Self::SystemName(name),
DBError::Datastore(
DatastoreError::Table(TableError::IdNotFound(_, _)) | DatastoreError::Table(TableError::NotFound(_)),
) => Self::TableNotFound,
DBError::Datastore(DatastoreError::Table(TableError::ColumnNotFound(_))) => Self::BadColumn,
DBError::Datastore(DatastoreError::Index(IndexError::NotFound(_))) => Self::IndexNotFound,
DBError::Datastore(DatastoreError::Index(IndexError::Decode(e))) => Self::DecodeRow(e),
DBError::Datastore(DatastoreError::Index(IndexError::NotUnique(_))) => Self::IndexNotUnique,
DBError::Datastore(DatastoreError::Index(IndexError::KeyNotFound(..))) => Self::IndexRowNotFound,
_ => Self::Internal(Box::new(e)),
}
}
+4 -4
View File
@@ -2,7 +2,7 @@ use super::scheduler::{get_schedule_from_row, ScheduleError, Scheduler};
use crate::database_logger::{BacktraceProvider, LogLevel, Record};
use crate::db::datastore::locking_tx_datastore::MutTxId;
use crate::db::relational_db::{MutTx, RelationalDB};
use crate::error::{DBError, IndexError, NodesError};
use crate::error::{DBError, DatastoreError, IndexError, NodesError};
use crate::replica_context::ReplicaContext;
use core::mem;
use parking_lot::{Mutex, MutexGuard};
@@ -222,7 +222,7 @@ impl InstanceEnv {
#[cold]
#[inline(never)]
|e| match e {
DBError::Index(IndexError::UniqueConstraintViolation(_)) => {}
DBError::Datastore(DatastoreError::Index(IndexError::UniqueConstraintViolation(_))) => {}
_ => {
let res = stdb.table_name_from_id_mut(tx, table_id);
if let Ok(Some(table_name)) = res {
@@ -258,7 +258,7 @@ impl InstanceEnv {
.table_scheduled_id_and_at(tx, table_id)?
.expect("schedule_row should only be called when we know its a scheduler table");
let row_ref = tx.get(table_id, row_ptr)?.unwrap();
let row_ref = tx.get(table_id, row_ptr).map_err(DBError::from)?.unwrap();
let (schedule_id, schedule_at) = get_schedule_from_row(&row_ref, id_column, at_column)
// NOTE(centril): Should never happen,
// as we successfully inserted and thus `ret` is verified against the table schema.
@@ -291,7 +291,7 @@ impl InstanceEnv {
#[cold]
#[inline(never)]
|e| match e {
DBError::Index(IndexError::UniqueConstraintViolation(_)) => {}
DBError::Datastore(DatastoreError::Index(IndexError::UniqueConstraintViolation(_))) => {}
_ => {
let res = stdb.table_name_from_id_mut(tx, table_id);
if let Ok(Some(table_name)) = res {
+7 -2
View File
@@ -740,13 +740,16 @@ impl ModuleHost {
let stdb = self.inner.replica_ctx().relational_db.clone();
asyncify(move || {
stdb.with_auto_commit(workload, |mut_tx| {
mut_tx.insert_st_client(caller_identity, caller_connection_id)
mut_tx
.insert_st_client(caller_identity, caller_connection_id)
.map_err(DBError::from)
})
})
.await
.inspect_err(|e| {
log::error!("`call_identity_connected`: fallback transaction to insert into `st_client` failed: {e:#?}")
})
.map_err(DBError::from)
.map_err(Into::into)
}
}
@@ -791,7 +794,9 @@ impl ModuleHost {
let database_identity = self.info.database_identity;
asyncify(move || {
stdb.with_auto_commit(workload, |mut_tx| {
mut_tx.delete_st_client(caller_identity, caller_connection_id, database_identity)
mut_tx
.delete_st_client(caller_identity, caller_connection_id, database_identity)
.map_err(DBError::from)
})
})
.await
+9 -7
View File
@@ -7,7 +7,7 @@ use std::num::NonZeroU16;
use std::time::Instant;
use super::{scheduler::ScheduleError, AbiCall};
use crate::error::{DBError, IndexError, NodesError};
use crate::error::{DBError, DatastoreError, IndexError, NodesError};
use spacetimedb_primitives::errno;
use spacetimedb_sats::typespace::TypeRefError;
use spacetimedb_table::table::UniqueConstraintViolation;
@@ -348,12 +348,14 @@ pub fn err_to_errno(err: &NodesError) -> Option<NonZeroU16> {
NodesError::ScheduleError(ScheduleError::DelayTooLong(_)) => Some(errno::SCHEDULE_AT_DELAY_TOO_LONG),
NodesError::AlreadyExists(_) => Some(errno::UNIQUE_ALREADY_EXISTS),
NodesError::Internal(internal) => match **internal {
DBError::Index(IndexError::UniqueConstraintViolation(UniqueConstraintViolation {
constraint_name: _,
table_name: _,
cols: _,
value: _,
})) => Some(errno::UNIQUE_ALREADY_EXISTS),
DBError::Datastore(DatastoreError::Index(IndexError::UniqueConstraintViolation(
UniqueConstraintViolation {
constraint_name: _,
table_name: _,
cols: _,
value: _,
},
))) => Some(errno::UNIQUE_ALREADY_EXISTS),
_ => None,
},
_ => None,