use anyhow::Result; use core::hash::{Hash, Hasher}; use core::ops::RangeBounds; use spacetimedb_lib::query::Delta; use spacetimedb_physical_plan::plan::{ProjectField, TupleField}; use spacetimedb_primitives::{ColList, IndexId, TableId}; use spacetimedb_sats::bsatn::{BufReservedFill, EncodeError, ToBsatn}; use spacetimedb_sats::buffer::BufWriter; use spacetimedb_sats::product_value::InvalidFieldError; use spacetimedb_sats::{impl_serialize, AlgebraicValue, ProductValue}; use spacetimedb_table::{static_assert_size, table::RowRef}; pub mod dml; pub mod pipelined; pub trait Datastore { /// Iterator type for table scans type TableIter<'a>: Iterator> + 'a where Self: 'a; /// Iterator type for ranged index scans. type RangeIndexIter<'a>: Iterator> + 'a where Self: 'a; /// Iterator type for point index scans. type PointIndexIter<'a>: Iterator> + 'a where Self: 'a; /// Returns the number of rows in this table fn row_count(&self, table_id: TableId) -> u64; /// Scans and returns all of the rows in a table fn table_scan<'a>(&'a self, table_id: TableId) -> Result>; /// Scans a range of keys from an index returning a [`RowRef`] iterator. fn index_scan_range<'a>( &'a self, table_id: TableId, index_id: IndexId, range: &impl RangeBounds, ) -> Result>; /// Scans a key from an index returning a [`RowRef`] iterator. fn index_scan_point<'a>( &'a self, table_id: TableId, index_id: IndexId, point: &AlgebraicValue, ) -> Result>; } pub trait DeltaStore { fn num_inserts(&self, table_id: TableId) -> usize; fn num_deletes(&self, table_id: TableId) -> usize; fn has_inserts(&self, table_id: TableId) -> bool { self.num_inserts(table_id) != 0 } fn has_deletes(&self, table_id: TableId) -> bool { self.num_deletes(table_id) != 0 } fn inserts_for_table(&self, table_id: TableId) -> Option>; fn deletes_for_table(&self, table_id: TableId) -> Option>; fn index_scan_range_for_delta( &self, table_id: TableId, index_id: IndexId, delta: Delta, range: impl RangeBounds, ) -> impl Iterator>; fn index_scan_point_for_delta( &self, table_id: TableId, index_id: IndexId, delta: Delta, point: &AlgebraicValue, ) -> impl Iterator>; fn delta_scan(&self, table_id: TableId, inserts: bool) -> DeltaScanIter<'_> { match inserts { true => DeltaScanIter { iter: self.inserts_for_table(table_id), }, false => DeltaScanIter { iter: self.deletes_for_table(table_id), }, } } } #[derive(Clone)] pub enum Row<'a> { Ptr(RowRef<'a>), Ref(&'a ProductValue), } impl PartialEq for Row<'_> { fn eq(&self, other: &Self) -> bool { match (self, other) { (Self::Ptr(x), Self::Ptr(y)) => x == y, (Self::Ref(x), Self::Ref(y)) => x == y, (Self::Ptr(x), Self::Ref(y)) => x == *y, (Self::Ref(x), Self::Ptr(y)) => y == *x, } } } impl Eq for Row<'_> {} impl Hash for Row<'_> { fn hash(&self, state: &mut H) { match self { Self::Ptr(x) => x.hash(state), Self::Ref(x) => x.hash(state), } } } impl Row<'_> { pub fn to_product_value(&self) -> ProductValue { match self { Self::Ptr(ptr) => ptr.to_product_value(), Self::Ref(val) => (*val).clone(), } } pub fn project_product(self, cols: &ColList) -> Result { match self { Self::Ptr(ptr) => ptr.project_product(cols), Self::Ref(val) => val.project_product(cols), } } } impl_serialize!(['a] Row<'a>, (self, ser) => match self { Self::Ptr(row) => row.serialize(ser), Self::Ref(row) => row.serialize(ser), }); impl ToBsatn for Row<'_> { fn static_bsatn_size(&self) -> Option { match self { Self::Ptr(ptr) => ptr.static_bsatn_size(), Self::Ref(val) => val.static_bsatn_size(), } } fn to_bsatn_extend(&self, buf: &mut (impl BufWriter + BufReservedFill)) -> std::result::Result<(), EncodeError> { match self { Self::Ptr(ptr) => ptr.to_bsatn_extend(buf), Self::Ref(val) => val.to_bsatn_extend(buf), } } fn to_bsatn_vec(&self) -> std::result::Result, EncodeError> { match self { Self::Ptr(ptr) => ptr.to_bsatn_vec(), Self::Ref(val) => val.to_bsatn_vec(), } } } impl ProjectField for Row<'_> { fn project(&self, field: &TupleField) -> AlgebraicValue { match self { Self::Ptr(ptr) => ptr.project(field), Self::Ref(val) => val.project(field), } } } /// Each query operator returns a tuple of [RowRef]s #[derive(Clone)] pub enum Tuple<'a> { /// A pointer to a row in a base table Row(Row<'a>), /// A temporary returned by a join operator Join(Vec>), } static_assert_size!(Tuple, 40); impl ProjectField for Tuple<'_> { fn project(&self, field: &TupleField) -> AlgebraicValue { match self { Self::Row(row) => row.project(field), Self::Join(ptrs) => field .label_pos .and_then(|i| ptrs.get(i)) .map(|ptr| ptr.project(field)) .unwrap(), } } } impl<'a> Tuple<'a> { /// Select the tuple element at position `i` fn select(self, i: usize) -> Option> { match self { Self::Row(_) => None, Self::Join(mut ptrs) => Some(ptrs.swap_remove(i)), } } /// Append a [Row] to a tuple fn append(self, ptr: Row<'a>) -> Self { match self { Self::Row(row) => Self::Join(vec![row, ptr]), Self::Join(mut rows) => { rows.push(ptr); Self::Join(rows) } } } fn join(self, with: Self) -> Self { match with { Self::Row(ptr) => self.append(ptr), Self::Join(ptrs) => ptrs.into_iter().fold(self, |tup, ptr| tup.append(ptr)), } } } pub struct DeltaScanIter<'a> { iter: Option>, } impl<'a> Iterator for DeltaScanIter<'a> { type Item = &'a ProductValue; fn next(&mut self) -> Option { self.iter.as_mut().and_then(|iter| iter.next()) } }