Bypass AlgebraicValue for datastore updates and bsatn based index scans + BytesKey optimization (#4311)

# Description of Changes

Introduces types `TypedIndexKey` and `IndexKey` to free the table index
code on its direct dependency on `AlgebraicValue`, allowing index scans
by BSATN, `RowRef`s as well without first going through
`AlgebraicValue`.
This also has the effect of optimizing string scans by avoiding
allocating in `AlgebraicValue::String`.

This also adds a byte-array based future optimization for
all-primitive-multi-column indexes.

Also in the future, this will enable optimizing `iter_by_col_eq`, which
is used by the frequent connect/disconnect logic.

# API and ABI breaking changes

None

# Expected complexity level and risk

3? Unsafe code and very load bearing code.

# Testing

Should be covered by existing tests.
This commit is contained in:
Mazdak Farrokhzad
2026-03-27 13:35:22 +01:00
committed by GitHub
parent afd3f35994
commit e33cefbb2a
24 changed files with 1976 additions and 924 deletions
+18 -17
View File
@@ -56,11 +56,12 @@ impl MetadataFile {
path.write(self.to_string())
}
fn check_compatibility(previous: &Self, current: &Self) -> anyhow::Result<()> {
fn check_compatibility(previous: &Self, current: &Self, metafile: &Path) -> anyhow::Result<()> {
anyhow::ensure!(
previous.edition == current.edition,
"metadata.toml indicates that this database is from a different \
"metadata.toml at {} indicates that this database is from a different \
edition of SpacetimeDB (running {:?}, but this database is {:?})",
metafile.display(),
current.edition,
previous.edition,
);
@@ -110,8 +111,8 @@ impl MetadataFile {
/// `self` is the metadata file read from a database, and current is
/// the default metadata file that the active database version would
/// right to a new database.
pub fn check_compatibility_and_update(mut self, current: Self) -> anyhow::Result<Self> {
Self::check_compatibility(&self, &current)?;
pub fn check_compatibility_and_update(mut self, current: Self, metafile: &Path) -> anyhow::Result<Self> {
Self::check_compatibility(&self, &current, metafile)?;
// bump the version in the file only if it's being run in a newer database.
self.version = std::cmp::max(self.version, current.version);
Ok(self)
@@ -344,67 +345,67 @@ mod tests {
fn check_metadata_compatibility_checking() {
assert_eq!(
mkmeta(1, 0, 0)
.check_compatibility_and_update(mkmeta(1, 0, 1))
.check_compatibility_and_update(mkmeta(1, 0, 1), Path::new("metadata.toml"))
.unwrap()
.version,
mkver(1, 0, 1)
);
assert_eq!(
mkmeta(1, 0, 1)
.check_compatibility_and_update(mkmeta(1, 0, 0))
.check_compatibility_and_update(mkmeta(1, 0, 0), Path::new("metadata.toml"))
.unwrap()
.version,
mkver(1, 0, 1)
);
mkmeta(1, 1, 0)
.check_compatibility_and_update(mkmeta(1, 0, 5))
.check_compatibility_and_update(mkmeta(1, 0, 5), Path::new("metadata.toml"))
.unwrap_err();
mkmeta(2, 0, 0)
.check_compatibility_and_update(mkmeta(1, 3, 5))
.check_compatibility_and_update(mkmeta(1, 3, 5), Path::new("metadata.toml"))
.unwrap_err();
assert_eq!(
mkmeta(1, 12, 0)
.check_compatibility_and_update(mkmeta(2, 0, 0))
.check_compatibility_and_update(mkmeta(2, 0, 0), Path::new("metadata.toml"))
.unwrap()
.version,
mkver(2, 0, 0)
);
mkmeta(2, 0, 0)
.check_compatibility_and_update(mkmeta(3, 0, 0))
.check_compatibility_and_update(mkmeta(3, 0, 0), Path::new("metadata.toml"))
.unwrap_err();
}
#[test]
fn check_metadata_compatibility_prerelease() {
mkmeta(1, 9, 0)
.check_compatibility_and_update(mkmeta_pre(2, 0, 0, "rc1"))
.check_compatibility_and_update(mkmeta_pre(2, 0, 0, "rc1"), Path::new("metadata.toml"))
.unwrap();
mkmeta_pre(2, 0, 0, "rc1")
.check_compatibility_and_update(mkmeta_pre(2, 0, 0, "rc1"))
.check_compatibility_and_update(mkmeta_pre(2, 0, 0, "rc1"), Path::new("metadata.toml"))
.unwrap();
mkmeta_pre(2, 0, 0, "rc1")
.check_compatibility_and_update(mkmeta(2, 0, 1))
.check_compatibility_and_update(mkmeta(2, 0, 1), Path::new("metadata.toml"))
.unwrap();
mkmeta_pre(2, 0, 0, "rc1")
.check_compatibility_and_update(mkmeta(2, 0, 0))
.check_compatibility_and_update(mkmeta(2, 0, 0), Path::new("metadata.toml"))
.unwrap();
// Now check some failures..
mkmeta_pre(2, 0, 0, "rc1")
.check_compatibility_and_update(mkmeta_pre(2, 0, 0, "rc2"))
.check_compatibility_and_update(mkmeta_pre(2, 0, 0, "rc2"), Path::new("metadata.toml"))
.unwrap_err();
mkmeta_pre(2, 0, 0, "rc2")
.check_compatibility_and_update(mkmeta_pre(2, 0, 0, "rc1"))
.check_compatibility_and_update(mkmeta_pre(2, 0, 0, "rc1"), Path::new("metadata.toml"))
.unwrap_err();
mkmeta(2, 0, 0)
.check_compatibility_and_update(mkmeta_pre(2, 1, 0, "rc1"))
.check_compatibility_and_update(mkmeta_pre(2, 1, 0, "rc1"), Path::new("metadata.toml"))
.unwrap_err();
}
+11 -18
View File
@@ -17,7 +17,7 @@ use spacetimedb_datastore::locking_tx_datastore::datastore::TxMetrics;
use spacetimedb_datastore::locking_tx_datastore::state_view::{
IterByColEqMutTx, IterByColRangeMutTx, IterMutTx, StateView,
};
use spacetimedb_datastore::locking_tx_datastore::{MutTxId, TxId};
use spacetimedb_datastore::locking_tx_datastore::{IndexScanPointOrRange, MutTxId, TxId};
use spacetimedb_datastore::system_tables::{
system_tables, StModuleRow, ST_CLIENT_ID, ST_CONNECTION_CREDENTIALS_ID, ST_VIEW_SUB_ID,
};
@@ -55,10 +55,11 @@ use spacetimedb_snapshot::{ReconstructedSnapshot, SnapshotError, SnapshotReposit
use spacetimedb_table::indexes::RowPointer;
use spacetimedb_table::page_pool::PagePool;
use spacetimedb_table::table::{RowRef, TableScanIter};
use spacetimedb_table::table_index::IndexKey;
use std::borrow::Cow;
use std::io;
use std::num::NonZeroUsize;
use std::ops::{Bound, RangeBounds};
use std::ops::RangeBounds;
use std::sync::Arc;
use tokio::sync::watch;
@@ -1394,32 +1395,24 @@ impl RelationalDB {
Ok(self.inner.iter_by_col_range_tx(tx, table_id.into(), cols, range)?)
}
pub fn index_scan_range<'a>(
pub fn index_scan_range<'de, 'a>(
&'a self,
tx: &'a MutTx,
index_id: IndexId,
prefix: &[u8],
prefix: &'de [u8],
prefix_elems: ColId,
rstart: &[u8],
rend: &[u8],
) -> Result<
(
TableId,
Bound<AlgebraicValue>,
Bound<AlgebraicValue>,
impl Iterator<Item = RowRef<'a>> + use<'a>,
),
DBError,
> {
rstart: &'de [u8],
rend: &'de [u8],
) -> Result<(TableId, IndexScanPointOrRange<'de, 'a>), DBError> {
Ok(tx.index_scan_range(index_id, prefix, prefix_elems, rstart, rend)?)
}
pub fn index_scan_point<'a>(
pub fn index_scan_point<'a, 'p>(
&'a self,
tx: &'a MutTx,
index_id: IndexId,
point: &[u8],
) -> Result<(TableId, AlgebraicValue, impl Iterator<Item = RowRef<'a>> + use<'a>), DBError> {
point: &'p [u8],
) -> Result<(TableId, IndexKey<'p>, impl Iterator<Item = RowRef<'a>> + use<'a>), DBError> {
Ok(tx.index_scan_point(index_id, point)?)
}
+12 -6
View File
@@ -17,7 +17,7 @@ use spacetimedb_client_api_messages::energy::EnergyQuanta;
use spacetimedb_datastore::db_metrics::DB_METRICS;
use spacetimedb_datastore::execution_context::Workload;
use spacetimedb_datastore::locking_tx_datastore::state_view::StateView;
use spacetimedb_datastore::locking_tx_datastore::{FuncCallType, MutTxId};
use spacetimedb_datastore::locking_tx_datastore::{FuncCallType, IndexScanPointOrRange, MutTxId};
use spacetimedb_datastore::traits::IsolationLevel;
use spacetimedb_lib::{http as st_http, ConnectionId, Identity, Timestamp};
use spacetimedb_primitives::{ColId, ColList, IndexId, TableId};
@@ -489,9 +489,12 @@ impl InstanceEnv {
let tx = &mut *self.get_tx()?;
// Find all rows in the table to delete.
let (table_id, _, _, iter) = stdb.index_scan_range(tx, index_id, prefix, prefix_elems, rstart, rend)?;
let (table_id, iter) = stdb.index_scan_range(tx, index_id, prefix, prefix_elems, rstart, rend)?;
// Re. `SmallVec`, `delete_by_field` only cares about 1 element, so optimize for that.
let rows_to_delete = iter.map(|row_ref| row_ref.pointer()).collect::<SmallVec<[_; 1]>>();
let rows_to_delete = match iter {
IndexScanPointOrRange::Point(_, iter) => iter.map(|row_ref| row_ref.pointer()).collect(),
IndexScanPointOrRange::Range(iter) => iter.map(|row_ref| row_ref.pointer()).collect(),
};
Ok(Self::datastore_delete_by_index_scan(stdb, tx, table_id, rows_to_delete))
}
@@ -653,19 +656,22 @@ impl InstanceEnv {
let tx = &mut *self.get_tx()?;
// Open index iterator
let (table_id, lower, upper, iter) =
let (table_id, iter) =
self.relational_db()
.index_scan_range(tx, index_id, prefix, prefix_elems, rstart, rend)?;
// Scan the index and serialize rows to BSATN.
let (chunks, rows_scanned, bytes_scanned) = ChunkedWriter::collect_iter(pool, iter);
let (point, (chunks, rows_scanned, bytes_scanned)) = match iter {
IndexScanPointOrRange::Point(point, iter) => (Some(point), ChunkedWriter::collect_iter(pool, iter)),
IndexScanPointOrRange::Range(iter) => (None, ChunkedWriter::collect_iter(pool, iter)),
};
// Record the number of rows and the number of bytes scanned by the iterator.
tx.metrics.index_seeks += 1;
tx.metrics.bytes_scanned += bytes_scanned;
tx.metrics.rows_scanned += rows_scanned;
tx.record_index_scan_range(&self.func_type, table_id, index_id, lower, upper);
tx.record_index_scan_range(&self.func_type, table_id, index_id, point);
Ok(chunks)
}
@@ -12,7 +12,7 @@ use crate::{
execution_context::ExecutionContext,
locking_tx_datastore::{
mut_tx::ViewReadSets,
state_view::{iter_st_column_for_table, ApplyFilter, EqOnColumn, RangeOnColumn, ScanOrIndex},
state_view::{iter_st_column_for_table, ScanOrIndex},
IterByColRangeTx,
},
system_tables::{
@@ -51,8 +51,7 @@ use spacetimedb_table::{
blob_store::{BlobStore, HashMapBlobStore},
indexes::{RowPointer, SquashedOffset},
page_pool::PagePool,
table::{IndexScanPointIter, IndexScanRangeIter, InsertError, RowRef, Table, TableAndIndex, TableScanIter},
table_index::IndexSeekRangeResult,
table::{InsertError, RowRef, Table, TableAndIndex, TableScanIter},
};
use std::collections::BTreeMap;
use std::sync::Arc;
@@ -220,12 +219,12 @@ impl StateView for CommittedState {
cols: ColList,
range: R,
) -> Result<Self::IterByColRange<'_, R>> {
match self.index_seek_range(table_id, &cols, &range) {
let iter = self
.get_index_by_cols(table_id, &cols)
.map(|i| i.seek_range_via_algebraic_value(&range));
match iter {
Some(Ok(iter)) => Ok(ScanOrIndex::Index(iter)),
None | Some(Err(_)) => Ok(ScanOrIndex::Scan(ApplyFilter::new(
RangeOnColumn { cols, range },
self.iter(table_id)?,
))),
None | Some(Err(_)) => Ok(ScanOrIndex::scan_range(cols, range, self.iter(table_id)?)),
}
}
@@ -236,12 +235,12 @@ impl StateView for CommittedState {
val: &'r AlgebraicValue,
) -> Result<Self::IterByColEq<'a, 'r>> {
let cols = cols.into();
match self.index_seek_point(table_id, &cols, val) {
let iter = self
.get_index_by_cols(table_id, &cols)
.map(|i| i.seek_point_via_algebraic_value(val));
match iter {
Some(iter) => Ok(ScanOrIndex::Index(iter)),
None => Ok(ScanOrIndex::Scan(ApplyFilter::new(
EqOnColumn { cols, val },
self.iter(table_id)?,
))),
None => Ok(ScanOrIndex::scan_eq(cols, val, self.iter(table_id)?)),
}
}
@@ -963,45 +962,11 @@ impl CommittedState {
Some(self.get_table(table_id)?.scan_rows(&self.blob_store))
}
/// When there's an index on `cols`,
/// returns an iterator over the [TableIndex] that yields all the [`RowRef`]s
/// that match the specified `range` in the indexed column.
///
/// Matching is defined by `Ord for AlgebraicValue`.
///
/// For a unique index this will always yield at most one `RowRef`
/// when `range` is a point.
/// When there is no index this returns `None`.
pub(super) fn index_seek_range<'a>(
&'a self,
table_id: TableId,
cols: &ColList,
range: &impl RangeBounds<AlgebraicValue>,
) -> Option<IndexSeekRangeResult<IndexScanRangeIter<'a>>> {
/// Returns an index for `table_id` on `cols`, if any.
pub(super) fn get_index_by_cols(&self, table_id: TableId, cols: &ColList) -> Option<TableAndIndex<'_>> {
self.tables
.get(&table_id)?
.get_index_by_cols_with_table(&self.blob_store, cols)
.map(|i| i.seek_range(range))
}
/// When there's an index on `cols`,
/// returns an iterator over the [TableIndex] that yields all the [`RowRef`]s
/// that equal `value` in the indexed column.
///
/// Matching is defined by `Eq for AlgebraicValue`.
///
/// For a unique index this will always yield at most one `RowRef`.
/// When there is no index this returns `None`.
pub(super) fn index_seek_point<'a>(
&'a self,
table_id: TableId,
cols: &ColList,
value: &AlgebraicValue,
) -> Option<IndexScanPointIter<'a>> {
self.tables
.get(&table_id)?
.get_index_by_cols_with_table(&self.blob_store, cols)
.map(|i| i.seek_point(value))
}
/// Returns the table associated with the given `index_id`, if any.
@@ -3,7 +3,7 @@
pub mod committed_state;
pub mod datastore;
mod mut_tx;
pub use mut_tx::{FuncCallType, MutTxId, ViewCallInfo};
pub use mut_tx::{FuncCallType, IndexScanPointOrRange, MutTxId, ViewCallInfo};
mod sequence;
pub mod state_view;
pub use state_view::{IterByColEqTx, IterByColRangeTx};
@@ -10,7 +10,6 @@ use super::{
};
use crate::{
error::ViewError,
locking_tx_datastore::state_view::EqOnColumn,
system_tables::{
system_tables, ConnectionIdViaU128, IdentityViaU256, StConnectionCredentialsFields, StConnectionCredentialsRow,
StViewColumnFields, StViewFields, StViewParamFields, StViewParamRow, StViewSubFields, StViewSubRow,
@@ -32,32 +31,27 @@ use crate::{
use crate::{execution_context::ExecutionContext, system_tables::StViewColumnRow};
use crate::{execution_context::Workload, system_tables::StViewRow};
use crate::{
locking_tx_datastore::state_view::{ApplyFilter, RangeOnColumn, ScanOrIndex},
locking_tx_datastore::state_view::ScanOrIndex,
traits::{InsertFlags, RowTypeForTable, TxData, UpdateFlags},
};
use core::ops::RangeBounds;
use core::{cell::RefCell, mem};
use core::{iter, ops::Bound};
use core::{cell::RefCell, iter, mem, ops::RangeBounds};
use itertools::Either;
use smallvec::SmallVec;
use spacetimedb_data_structures::map::{HashMap, HashSet, IntMap};
use spacetimedb_durability::TxOffset;
use spacetimedb_execution::{dml::MutDatastore, Datastore, DeltaStore, Row};
use spacetimedb_lib::{db::raw_def::v9::RawSql, metrics::ExecutionMetrics, Timestamp};
use spacetimedb_lib::{
db::raw_def::v9::RawSql,
db::{auth::StAccess, raw_def::SEQUENCE_ALLOCATION_STEP},
ConnectionId, Identity,
metrics::ExecutionMetrics,
ConnectionId, Identity, Timestamp,
};
use spacetimedb_primitives::{
col_list, ArgId, ColId, ColList, ColSet, ConstraintId, IndexId, ScheduleId, SequenceId, TableId, ViewFnPtr, ViewId,
};
use spacetimedb_sats::{
bsatn::{self, to_writer, DecodeError, Deserializer},
de::{DeserializeSeed, WithBound},
memory_usage::MemoryUsage,
raw_identifier::RawIdentifier,
ser::Serialize,
AlgebraicType, AlgebraicValue, ProductType, ProductValue, WithTypespace,
bsatn::to_writer, memory_usage::MemoryUsage, raw_identifier::RawIdentifier, ser::Serialize, AlgebraicValue,
ProductType, ProductValue,
};
use spacetimedb_schema::{
def::{ModuleDef, ViewColumnDef, ViewDef, ViewParamDef},
@@ -74,7 +68,7 @@ use spacetimedb_table::{
BlobNumBytes, DuplicateError, IndexScanPointIter, IndexScanRangeIter, InsertError, RowRef, Table,
TableAndIndex, UniqueConstraintViolation,
},
table_index::{IndexCannotSeekRange, IndexSeekRangeResult, TableIndex},
table_index::{IndexCannotSeekRange, IndexKey, IndexSeekRangeResult, PointOrRange, TableIndex},
};
use std::{
marker::PhantomData,
@@ -82,8 +76,6 @@ use std::{
time::{Duration, Instant},
};
type DecodeResult<T> = core::result::Result<T, DecodeError>;
#[derive(Clone, Debug, Eq, PartialEq, Hash)]
pub struct ViewCallInfo {
pub view_id: ViewId,
@@ -280,11 +272,13 @@ impl MutTxId {
op: &FuncCallType,
table_id: TableId,
index_id: IndexId,
lower: Bound<AlgebraicValue>,
upper: Bound<AlgebraicValue>,
// The point that was scanned, if any.
// Otherwise this was a range scan
// and we'll conservatively record this as a full table scan.
point: Option<IndexKey<'_>>,
) {
if let FuncCallType::View(view) = op {
self.record_index_scan_range_inner(view, table_id, index_id, lower, upper);
self.record_index_scan_range_inner(view, table_id, index_id, point);
};
}
@@ -297,19 +291,15 @@ impl MutTxId {
view: &ViewCallInfo,
table_id: TableId,
index_id: IndexId,
lower: Bound<AlgebraicValue>,
upper: Bound<AlgebraicValue>,
point: Option<IndexKey<'_>>,
) {
// Check for precise index seek.
if let (Bound::Included(low_val), Bound::Included(up_val)) = (&lower, &upper)
&& low_val == up_val
{
self.record_index_scan_point_inner(view, table_id, index_id, low_val.clone());
return;
if let Some(point) = point {
// We got a precise index seek.
self.record_index_scan_point_inner(view, table_id, index_id, point);
} else {
// Everything else is treated as a table scan.
self.read_sets.insert_full_table_scan(table_id, view.clone());
}
// Everything else is treated as a table scan.
self.read_sets.insert_full_table_scan(table_id, view.clone());
}
/// Record that a view performs a point index scan in this transaction's read set.
@@ -319,10 +309,10 @@ impl MutTxId {
op: &FuncCallType,
table_id: TableId,
index_id: IndexId,
val: AlgebraicValue,
point: IndexKey<'_>,
) {
if let FuncCallType::View(view) = op {
self.record_index_scan_point_inner(view, table_id, index_id, val);
self.record_index_scan_point_inner(view, table_id, index_id, point);
};
}
@@ -334,15 +324,17 @@ impl MutTxId {
view: &ViewCallInfo,
table_id: TableId,
index_id: IndexId,
val: AlgebraicValue,
point: IndexKey<'_>,
) {
// Fetch index metadata
let Some((_, idx, _)) = self.get_table_and_index(index_id) else {
return;
};
let cols = idx.index().indexed_columns.clone();
self.read_sets.insert_index_scan(table_id, cols, val, view.clone());
let idx = idx.index();
let cols = idx.indexed_columns.clone();
let point = point.into_algebraic_value(&idx.key_type);
self.read_sets.insert_index_scan(table_id, cols, point, view.clone());
}
/// Returns the views whose read sets overlaps with this transaction's write set
@@ -450,7 +442,7 @@ impl Datastore for MutTxId {
.get_table_and_index(index_id)
.ok_or_else(|| IndexError::NotFound(index_id))?;
self.index_scan_range_inner(table_id, tx_index, commit_index, range)
Self::index_scan_range_via_algebraic_value(&self.tx_state, table_id, tx_index, commit_index, range)
.map_err(|IndexCannotSeekRange| IndexError::IndexCannotSeekRange(index_id).into())
}
@@ -465,7 +457,14 @@ impl Datastore for MutTxId {
.get_table_and_index(index_id)
.ok_or_else(|| IndexError::NotFound(index_id))?;
Ok(self.index_scan_point_inner(table_id, tx_index, commit_index, point))
let point = commit_index.index().key_from_algebraic_value(point);
Ok(Self::index_scan_point_inner(
&self.tx_state,
table_id,
tx_index,
commit_index,
&point,
))
}
}
@@ -1410,48 +1409,38 @@ impl MutTxId {
/// Returns an iterator yielding rows by performing a point index scan
/// on the index identified by `index_id`.
pub fn index_scan_point<'a>(
pub fn index_scan_point<'a, 'p>(
&'a self,
index_id: IndexId,
mut point: &[u8],
) -> Result<(TableId, AlgebraicValue, IndexScanPoint<'a>)> {
point: &'p [u8],
) -> Result<(TableId, IndexKey<'p>, IndexScanPoint<'a>)> {
// Extract the table id, and commit/tx indices.
let (table_id, commit_index, tx_index) = self
.get_table_and_index(index_id)
.ok_or_else(|| IndexError::NotFound(index_id))?;
// Extract the index type.
let index_ty = &commit_index.index().key_type;
// We have the index key type, so we can decode the key.
let index_ty = WithTypespace::empty(index_ty);
let point = index_ty
.deserialize(Deserializer::new(&mut point))
.map_err(IndexError::Decode)?;
// Get an index seek iterator for the tx and committed state.
let tx_iter = tx_index.map(|i| i.seek_point(&point));
let commit_iter = commit_index.seek_point(&point);
let dt = self.tx_state.get_delete_table(table_id);
let iter = ScanMutTx::combine(dt, tx_iter, commit_iter);
// Decode the key.
let point = commit_index.index().key_from_bsatn(point).map_err(IndexError::Decode)?;
// Get index seek iterators for the tx and committed state.
let iter = Self::index_scan_point_inner(&self.tx_state, table_id, tx_index, commit_index, &point);
Ok((table_id, point, iter))
}
/// See [`MutTxId::index_scan_point`].
fn index_scan_point_inner<'a>(
&'a self,
tx_state: &'a TxState,
table_id: TableId,
tx_index: Option<TableAndIndex<'a>>,
commit_index: TableAndIndex<'a>,
point: &AlgebraicValue,
point: &IndexKey<'_>,
) -> IndexScanPoint<'a> {
// Get an index seek iterator for the tx and committed state.
let tx_iter = tx_index.map(|i| i.seek_point(point));
let commit_iter = commit_index.seek_point(point);
// Combine it all.
let dt = self.tx_state.get_delete_table(table_id);
let dt = tx_state.get_delete_table(table_id);
ScanMutTx::combine(dt, tx_iter, commit_iter)
}
@@ -1461,46 +1450,66 @@ impl MutTxId {
/// The `prefix` is equated to the first `prefix_elems` values of the index key
/// and then `prefix_elem`th value is bounded to the left bys `rstart`
/// and to the right by `rend`.
pub fn index_scan_range<'a>(
pub fn index_scan_range<'de, 'a>(
&'a self,
index_id: IndexId,
prefix: &[u8],
prefix: &'de [u8],
prefix_elems: ColId,
rstart: &[u8],
rend: &[u8],
) -> Result<(
TableId,
Bound<AlgebraicValue>,
Bound<AlgebraicValue>,
IndexScanRanged<'a>,
)> {
rstart: &'de [u8],
rend: &'de [u8],
) -> Result<(TableId, IndexScanPointOrRange<'de, 'a>)> {
// Extract the table id, and commit/tx indices.
let (table_id, commit_index, tx_index) = self
.get_table_and_index(index_id)
.ok_or_else(|| IndexError::NotFound(index_id))?;
// Extract the index type.
let index_ty = &commit_index.index().key_type;
// We have the index key type, so we can decode everything.
let bounds =
Self::range_scan_decode_bounds(index_ty, prefix, prefix_elems, rstart, rend).map_err(IndexError::Decode)?;
// Decode the bounds.
let bounds = commit_index
.index()
.bounds_from_bsatn(prefix, prefix_elems, rstart, rend)
.map_err(IndexError::Decode)?;
let iter = self
.index_scan_range_inner(table_id, tx_index, commit_index, &bounds)
.map_err(|IndexCannotSeekRange| IndexError::IndexCannotSeekRange(index_id))?;
let (lower, upper) = bounds;
Ok((table_id, lower, upper, iter))
// Depending on whether this is a point or range bound,
// we'll either do an index point or range scan.
let iter = match bounds {
PointOrRange::Point(point) => {
let iter = Self::index_scan_point_inner(&self.tx_state, table_id, tx_index, commit_index, &point);
IndexScanPointOrRange::Point(point, iter)
}
PointOrRange::Range(start, end) => {
let bounds = (start.as_ref(), end.as_ref());
let iter = Self::index_scan_range_inner(&self.tx_state, table_id, tx_index, commit_index, &bounds)
.map_err(|IndexCannotSeekRange| IndexError::IndexCannotSeekRange(index_id))?;
IndexScanPointOrRange::Range(iter)
}
};
Ok((table_id, iter))
}
/// See [`MutTxId::index_scan_range`].
#[inline(always)]
fn index_scan_range_inner<'a>(
&'a self,
fn index_scan_range_via_algebraic_value<'a>(
tx_state: &'a TxState,
table_id: TableId,
tx_index: Option<TableAndIndex<'a>>,
commit_index: TableAndIndex<'a>,
bounds: &impl RangeBounds<AlgebraicValue>,
) -> IndexSeekRangeResult<IndexScanRanged<'a>> {
let index = commit_index.index();
let start = bounds.start_bound().map(|v| index.key_from_algebraic_value(v));
let end = bounds.end_bound().map(|v| index.key_from_algebraic_value(v));
let bounds = &(start, end);
Self::index_scan_range_inner(tx_state, table_id, tx_index, commit_index, bounds)
}
/// See [`MutTxId::index_scan_range`].
#[inline(always)]
fn index_scan_range_inner<'a, 'b>(
tx_state: &'a TxState,
table_id: TableId,
tx_index: Option<TableAndIndex<'a>>,
commit_index: TableAndIndex<'a>,
bounds: &impl RangeBounds<IndexKey<'b>>,
) -> IndexSeekRangeResult<IndexScanRanged<'a>> {
// Get an index seek iterator for the tx and committed state.
let tx_iter = tx_index.map(|i| i.seek_range(bounds)).transpose();
@@ -1513,7 +1522,7 @@ impl MutTxId {
};
// Combine it all.
let dt = self.tx_state.get_delete_table(table_id);
let dt = tx_state.get_delete_table(table_id);
Ok(ScanMutTx::combine(dt, tx_iter, commit_iter))
}
@@ -1537,104 +1546,6 @@ impl MutTxId {
Some((table_id, commit_index, tx_index))
}
/// Decode the bounds for a ranged index scan for an index typed at `key_type`.
fn range_scan_decode_bounds(
key_type: &AlgebraicType,
mut prefix: &[u8],
prefix_elems: ColId,
rstart: &[u8],
rend: &[u8],
) -> DecodeResult<(Bound<AlgebraicValue>, Bound<AlgebraicValue>)> {
match key_type {
// Multi-column index case.
AlgebraicType::Product(key_types) => {
let key_types = &key_types.elements;
// Split into types for the prefix and for the rest.
let (prefix_types, rest_types) = key_types
.split_at_checked(prefix_elems.idx())
.ok_or_else(|| DecodeError::Other("index key type has too few fields compared to prefix".into()))?;
// The `rstart` and `rend`s must be typed at `Bound<range_type>`.
// Extract that type and determine the length of the suffix.
let Some((range_type, suffix_types)) = rest_types.split_first() else {
return Err(DecodeError::Other(
"prefix length leaves no room for a range in ranged index scan".into(),
));
};
let suffix_len = suffix_types.len();
// We now have the types,
// so proceed to decoding the prefix, and the start/end bounds.
// Finally combine all of these to a single bound pair.
let prefix = bsatn::decode(prefix_types, &mut prefix)?;
let (start, end) = Self::range_scan_decode_start_end(&range_type.algebraic_type, rstart, rend)?;
Ok(Self::range_scan_combine_prefix_and_bounds(
prefix, start, end, suffix_len,
))
}
// Single-column index case. We implicitly have a PT of len 1.
_ if !prefix.is_empty() && prefix_elems.idx() != 0 => Err(DecodeError::Other(
"a single-column index cannot be prefix scanned".into(),
)),
ty => Self::range_scan_decode_start_end(ty, rstart, rend),
}
}
/// Decode `rstart` and `rend` as `Bound<ty>`.
fn range_scan_decode_start_end(
ty: &AlgebraicType,
mut rstart: &[u8],
mut rend: &[u8],
) -> DecodeResult<(Bound<AlgebraicValue>, Bound<AlgebraicValue>)> {
let range_type = WithBound(WithTypespace::empty(ty));
let range_start = range_type.deserialize(Deserializer::new(&mut rstart))?;
let range_end = range_type.deserialize(Deserializer::new(&mut rend))?;
Ok((range_start, range_end))
}
/// Combines `prefix` equality constraints with `start` and `end` bounds
/// filling with `suffix_len` to ensure that the number of fields matches
/// that of the index type.
fn range_scan_combine_prefix_and_bounds(
prefix: ProductValue,
start: Bound<AlgebraicValue>,
end: Bound<AlgebraicValue>,
suffix_len: usize,
) -> (Bound<AlgebraicValue>, Bound<AlgebraicValue>) {
let prefix_is_empty = prefix.elements.is_empty();
// Concatenate prefix, value, and the most permissive value for the suffix.
let concat = |prefix: ProductValue, val, fill| {
let mut vals: Vec<_> = prefix.elements.into();
vals.reserve(1 + suffix_len);
vals.push(val);
vals.extend(iter::repeat_n(fill, suffix_len));
AlgebraicValue::product(vals)
};
// The start endpoint needs `Min` as the suffix-filling element,
// as it imposes the least and acts like `Unbounded`.
let concat_start = |val| concat(prefix.clone(), val, AlgebraicValue::Min);
let range_start = match start {
Bound::Included(r) => Bound::Included(concat_start(r)),
Bound::Excluded(r) => Bound::Excluded(concat_start(r)),
// Prefix is empty, and suffix will be `Min`,
// so simplify `(Min, Min, ...)` to `Unbounded`.
Bound::Unbounded if prefix_is_empty => Bound::Unbounded,
Bound::Unbounded => Bound::Included(concat_start(AlgebraicValue::Min)),
};
// The end endpoint needs `Max` as the suffix-filling element,
// as it imposes the least and acts like `Unbounded`.
let concat_end = |val| concat(prefix, val, AlgebraicValue::Max);
let range_end = match end {
Bound::Included(r) => Bound::Included(concat_end(r)),
Bound::Excluded(r) => Bound::Excluded(concat_end(r)),
// Prefix is empty, and suffix will be `Max`,
// so simplify `(Max, Max, ...)` to `Unbounded`.
Bound::Unbounded if prefix_is_empty => Bound::Unbounded,
Bound::Unbounded => Bound::Included(concat_end(AlgebraicValue::Max)),
};
(range_start, range_end)
}
pub fn get_next_sequence_value(&mut self, seq_id: SequenceId) -> Result<i128> {
get_next_sequence_value(
&mut self.tx_state,
@@ -1645,6 +1556,16 @@ impl MutTxId {
}
}
/// Either a point or range index scan iterator.
/// Produced by [`MutTxId::index_scan_range`].
pub enum IndexScanPointOrRange<'de, 'a> {
/// A point scan iterator,
/// with the key included as it's needed by views (read sets).
Point(IndexKey<'de>, IndexScanPoint<'a>),
/// A range scan iterator.
Range(IndexScanRanged<'a>),
}
fn get_sequence_mut(seq_state: &mut SequencesState, seq_id: SequenceId) -> Result<&mut Sequence> {
seq_state
.get_sequence_mut(seq_id)
@@ -3018,10 +2939,12 @@ impl MutTxId {
throw!(IndexError::NotUnique(index_id));
}
// Project the row to the index's type.
// Derive the key of `tx_row_ref` for `commit_index`.
// SAFETY: `tx_row_ref`'s table is derived from `commit_index`'s table,
// so all `index.indexed_columns` will be in-bounds of the row layout.
let index_key = unsafe { tx_row_ref.project_unchecked(&commit_index.indexed_columns) };
// so the row layouts match and thus,
// `commit_index`'s key type is the same as the type of `row_ref`
// projected to `commit_index.indexed_columns`.
let index_key = unsafe { commit_index.key_from_row(tx_row_ref) };
// Try to find the old row first in the committed state using the `index_key`.
let mut old_commit_del_ptr = None;
@@ -3131,6 +3054,9 @@ impl MutTxId {
tx_row_ptr
} else {
let index_key = tx_row_ref
.project(&commit_index.indexed_columns)
.expect("`tx_row_ref` should be compatible with `commit_index`");
throw!(IndexError::KeyNotFound(index_id, index_key));
};
@@ -3330,23 +3256,23 @@ fn iter_by_col_range<'a, R: RangeBounds<AlgebraicValue>>(
cols: ColList,
range: R,
) -> Result<IterByColRangeMutTx<'a, R>> {
// If there's an index, use that.
// If there's an index that is compatible with a range scan, use that.
// It's sufficient to check that the committed state has an index
// as index schema changes are applied immediately.
if let Some(Ok(commit_iter)) = committed_state.index_seek_range(table_id, &cols, &range) {
let tx_iter = tx_state
.index_seek_range_by_cols(table_id, &cols, &range)
.map(|r| r.expect("got a commit index so we should have a compatible tx index"));
let delete_table = tx_state.get_delete_table(table_id);
let iter = ScanMutTx::combine(delete_table, tx_iter, commit_iter);
Ok(ScanOrIndex::Index(iter))
} else {
unindexed_iter_by_col_range_warn(tx_state, committed_state, table_id, &cols);
let iter = iter(tx_state, committed_state, table_id)?;
let filter = RangeOnColumn { cols, range };
let iter = ApplyFilter::new(filter, iter);
Ok(ScanOrIndex::Scan(iter))
if let Some(commit_index) = committed_state.get_index_by_cols(table_id, &cols) {
let tx_index = tx_state.get_index_by_cols(table_id, &cols);
if let Ok(iter) =
MutTxId::index_scan_range_via_algebraic_value(tx_state, table_id, tx_index, commit_index, &range)
{
return Ok(ScanOrIndex::Index(iter));
}
}
// No index found or it wasn't compatible with a range scan,
// so do a full scan and filter.
unindexed_iter_by_col_range_warn(tx_state, committed_state, table_id, &cols);
let iter = iter(tx_state, committed_state, table_id)?;
Ok(ScanOrIndex::scan_range(cols, range, iter))
}
#[cfg(not(feature = "unindexed_iter_by_col_range_warn"))]
@@ -3379,17 +3305,15 @@ fn iter_by_col_eq<'a, 'r>(
// It's sufficient to check that the committed state has an index
// as index schema changes are applied immediately.
let cols = cols.into();
if let Some(commit_iter) = committed_state.index_seek_point(table_id, &cols, val) {
let tx_iter = tx_state.index_seek_point_by_cols(table_id, &cols, val);
let delete_table = tx_state.get_delete_table(table_id);
let iter = ScanMutTx::combine(delete_table, tx_iter, commit_iter);
if let Some(commit_index) = committed_state.get_index_by_cols(table_id, &cols) {
let tx_index = tx_state.get_index_by_cols(table_id, &cols);
let key = commit_index.index().key_from_algebraic_value(val);
let iter = MutTxId::index_scan_point_inner(tx_state, table_id, tx_index, commit_index, &key);
Ok(ScanOrIndex::Index(iter))
} else {
unindexed_iter_by_col_eq_warn(tx_state, committed_state, table_id, &cols);
let iter = iter(tx_state, committed_state, table_id)?;
let filter = EqOnColumn { cols, val };
let iter = ApplyFilter::new(filter, iter);
Ok(ScanOrIndex::Scan(iter))
Ok(ScanOrIndex::scan_eq(cols, val, iter))
}
}
@@ -495,6 +495,20 @@ pub enum ScanOrIndex<S, I> {
Index(I),
}
impl<R, I, Idx> ScanOrIndex<ApplyFilter<RangeOnColumn<R>, I>, Idx> {
/// Returns a scan that applies a `RangeOnColumn` filter to `iter`.
pub(super) fn scan_range(cols: ColList, range: R, iter: I) -> Self {
Self::Scan(ApplyFilter::new(RangeOnColumn { cols, range }, iter))
}
}
impl<'r, I, Idx> ScanOrIndex<ApplyFilter<EqOnColumn<'r>, I>, Idx> {
/// Returns a scan that applies a `EqOnColumn` filter to `iter`.
pub(super) fn scan_eq(cols: ColList, val: &'r AlgebraicValue, iter: I) -> Self {
Self::Scan(ApplyFilter::new(EqOnColumn { cols, val }, iter))
}
}
impl<'a, S, I> Iterator for ScanOrIndex<S, I>
where
S: Iterator<Item = RowRef<'a>>,
@@ -67,7 +67,7 @@ impl Datastore for TxId {
index_id: IndexId,
range: &impl RangeBounds<AlgebraicValue>,
) -> anyhow::Result<Self::RangeIndexIter<'a>> {
self.with_index(table_id, index_id, |i| i.seek_range(range))?
self.with_index(table_id, index_id, |i| i.seek_range_via_algebraic_value(range))?
.map_err(|IndexCannotSeekRange| IndexError::IndexCannotSeekRange(index_id).into())
}
@@ -77,7 +77,7 @@ impl Datastore for TxId {
index_id: IndexId,
point: &AlgebraicValue,
) -> anyhow::Result<Self::PointIndexIter<'a>> {
self.with_index(table_id, index_id, |i| i.seek_point(point))
self.with_index(table_id, index_id, |i| i.seek_point_via_algebraic_value(point))
}
}
@@ -1,17 +1,16 @@
use super::{delete_table::DeleteTable, sequence::Sequence};
use core::ops::RangeBounds;
use spacetimedb_data_structures::map::IntMap;
use spacetimedb_lib::db::auth::StAccess;
use spacetimedb_primitives::{ColList, ConstraintId, IndexId, SequenceId, TableId};
use spacetimedb_sats::{memory_usage::MemoryUsage, AlgebraicValue};
use spacetimedb_sats::memory_usage::MemoryUsage;
use spacetimedb_schema::schema::{ColumnSchema, ConstraintSchema, IndexSchema, SequenceSchema};
use spacetimedb_table::{
blob_store::{BlobStore, HashMapBlobStore},
indexes::{RowPointer, SquashedOffset},
pointer_map::PointerMap,
static_assert_size,
table::{IndexScanPointIter, IndexScanRangeIter, RowRef, Table, TableAndIndex},
table_index::{IndexSeekRangeResult, TableIndex},
table::{RowRef, Table, TableAndIndex},
table_index::TableIndex,
};
use std::collections::{btree_map, BTreeMap};
use thin_vec::ThinVec;
@@ -168,45 +167,11 @@ impl TxState {
(ins_count, del_count)
}
/// When there's an index on `cols`,
/// returns an iterator over the `TableIndex` that yields all the [`RowRef`]s
/// that match the specified `range` in the indexed column.
///
/// Matching is defined by `Ord for AlgebraicValue`.
///
/// For a unique index this will always yield at most one `RowRef`
/// when `range` is a point.
/// When there is no index this returns `None`.
pub(super) fn index_seek_range_by_cols<'a>(
&'a self,
table_id: TableId,
cols: &ColList,
range: &impl RangeBounds<AlgebraicValue>,
) -> Option<IndexSeekRangeResult<IndexScanRangeIter<'a>>> {
/// Returns an index for `table_id` on `cols`, if any.
pub(super) fn get_index_by_cols(&self, table_id: TableId, cols: &ColList) -> Option<TableAndIndex<'_>> {
self.insert_tables
.get(&table_id)?
.get_index_by_cols_with_table(&self.blob_store, cols)
.map(|i| i.seek_range(range))
}
/// When there's an index on `cols`,
/// returns an iterator over the `TableIndex` that yields all the [`RowRef`]s
/// that match the specified `range` in the indexed column.
///
/// Matching is defined by `Eq for AlgebraicValue`.
///
/// For a unique index this will always yield at most one `RowRef`.
/// When there is no index this returns `None`.
pub(super) fn index_seek_point_by_cols<'a>(
&'a self,
table_id: TableId,
cols: &ColList,
point: &AlgebraicValue,
) -> Option<IndexScanPointIter<'a>> {
self.insert_tables
.get(&table_id)?
.get_index_by_cols_with_table(&self.blob_store, cols)
.map(|i| i.seek_point(point))
}
/// Returns the table for `table_id` combined with the index for `index_id`, if both exist.
+2
View File
@@ -477,6 +477,8 @@ impl<'de, T: Deserialize<'de>, U: Deserialize<'de>> VariantVisitor<'de> for Resu
}
}
impl_deserialize!([T: Deserialize<'de>] Bound<T>, de => WithBound(PhantomData).deserialize(de));
/// The visitor deserializes a `Bound<T>`.
#[derive(Clone, Copy)]
pub struct WithBound<S>(pub S);
+1 -1
View File
@@ -212,7 +212,7 @@ fn generate_array_value(ty: AlgebraicType) -> BoxedStrategy<ArrayValue> {
}
}
fn gen_with<T: Clone + Debug, US: Strategy>(
pub fn gen_with<T: Clone + Debug, US: Strategy>(
with: impl Strategy<Value = T>,
then: impl Fn(T) -> US,
) -> impl Strategy<Value = (T, US::Value)> {
+1 -1
View File
@@ -67,7 +67,7 @@ impl StandaloneEnv {
let meta_path = data_dir.metadata_toml();
let mut meta = MetadataFile::new("standalone");
if let Some(existing_meta) = MetadataFile::read(&meta_path).context("failed reading metadata.toml")? {
meta = existing_meta.check_compatibility_and_update(meta)?;
meta = existing_meta.check_compatibility_and_update(meta, meta_path.as_ref())?;
}
meta.write(&meta_path).context("failed writing metadata.toml")?;
+5 -6
View File
@@ -798,11 +798,10 @@ fn insert_num_same<R: IndexedRow>(
}
fn clear_all_same<R: IndexedRow>(tbl: &mut Table, index_id: IndexId, val_same: u64) {
let ptrs = tbl
.get_index_by_id(index_id)
.unwrap()
.seek_point(&R::column_value_from_u64(val_same))
.collect::<Vec<_>>();
let index = tbl.get_index_by_id(index_id).unwrap();
let key = R::column_value_from_u64(val_same);
let key = index.key_from_algebraic_value(&key);
let ptrs = index.seek_point(&key).collect::<Vec<_>>();
for ptr in ptrs {
tbl.delete(&mut NullBlobStore, ptr, |_| ()).unwrap();
}
@@ -919,7 +918,7 @@ fn index_seek(c: &mut Criterion) {
let mut elapsed = WallTime.zero();
for _ in 0..num_iters {
let (row, none) = time(&mut elapsed, || {
let mut iter = index.seek_range(&col_to_seek).unwrap();
let mut iter = index.seek_point_via_algebraic_value(&col_to_seek);
(iter.next(), iter.next())
});
assert!(
File diff suppressed because one or more lines are too long
@@ -7,3 +7,6 @@
cc 3276d3db4a1a70d78db9a6a01eaa3bba810a2317e9c67e4d5d8d93cbba472c99 # shrinks to ((ty, cols, pv), is_unique) = ((ProductType {None: Bool}, [ColId(0)], ProductValue { elements: [Bool(false)] }), false)
cc bc80b80ac2390452c0a152d2c6e2abc29ce146642f3cd0fe136ffe6173cf4c8c # shrinks to (ty, cols, pv) = (ProductType {None: I64}, [ColId(0)], ProductValue { elements: [I64(0)] }), kind = Direct
cc c1e4c959a32f6ab8ef9c4e29d39a24ec47cb03524584606a7f1fa4563f0f8cca # shrinks to (ty, cols, pv) = (ProductType {None: Sum(SumType {"variant_0": Product(ProductType {})})}, [ColId(0)], ProductValue { elements: [Sum(SumValue { tag: 0, value: Product(ProductValue { elements: [] }) })] }), kind = Direct
cc 4cb325be8b24c9efa5b1f20b9504d044d9dd110eb9e99355de4ca42f9cfc20b4 # shrinks to (ty, cols, pv) = (ProductType {None: Sum(SumType {"variant_0": Product(ProductType {})})}, [ColId(0)], ProductValue { elements: [Sum(SumValue { tag: 0, value: Product(ProductValue { elements: [] }) })] }), is_unique = false
cc a166a3c619c7cae3938f4e0cfb4e7a96cddfbb7943efd0b74e8cbb99d7a1e6a8 # shrinks to (ty, cols, pv) = (ProductType {None: U8}, [ColId(0)], ProductValue { elements: [U8(0)] }), kind = Direct
cc 05390c104810e7086fa5d3f3cac7f491a377ae6ba64431661fd94662e28d1fca # shrinks to (ty, cols, pv) = (ProductType {None: Sum(SumType {"variant_0": Product(ProductType {})})}, [ColId(0)], ProductValue { elements: [Sum(SumValue { tag: 0, value: Product(ProductValue { elements: [] }) })] }), kind = Direct
+80 -16
View File
@@ -11,10 +11,12 @@ use super::{
};
use core::cell::Cell;
use core::str;
use spacetimedb_primitives::ColList;
use spacetimedb_sats::{
i256, impl_serialize,
layout::{
align_to, AlgebraicTypeLayout, HasLayout as _, ProductTypeLayoutView, RowTypeLayout, SumTypeLayout, VarLenType,
align_to, AlgebraicTypeLayout, HasLayout as _, ProductTypeElementLayout, ProductTypeLayoutView, RowTypeLayout,
SumTypeLayout, VarLenType,
},
ser::{SerializeNamedProduct, Serializer},
u256, ArrayType,
@@ -44,6 +46,46 @@ pub unsafe fn serialize_row_from_page<S: Serializer>(
unsafe { serialize_product(ser, fixed_bytes, page, blob_store, &Cell::new(0), ty.product()) }
}
/// Serializes the columns `cols` of the row in `page`
/// where the fixed part of `row` starts at `fixed_offset`
/// and lasts `ty.size()` bytes. This region is typed at `ty`.
///
/// # Safety
///
/// 1. the `fixed_offset` must point at a row in `page` lasting `ty.size()` byte.
/// 2. the row must be a valid `ty`.
/// 3. for any `vlr: VarLenRef` stored in the row,
/// `vlr.first_offset` must either be `NULL` or point to a valid granule in `page`.
/// 4. any `col` in `cols` must be in-bounds of `ty`'s layout.
pub unsafe fn serialize_columns_from_page<S: Serializer>(
ser: S,
page: &Page,
blob_store: &dyn BlobStore,
fixed_offset: PageOffset,
ty: &RowTypeLayout,
cols: &ColList,
) -> Result<S::Ok, S::Error> {
let bytes = page.get_row_data(fixed_offset, ty.size());
let elems = &*ty.elements;
let mut ser = ser.serialize_named_product(elems.len())?;
for col in cols.iter() {
let col_idx = col.idx();
// SAFETY: per 4. caller promised that any `col` is in-bounds of `ty`'s layout.
let elem_ty = unsafe { elems.get_unchecked(col_idx) };
let offset = elem_ty.offset as usize;
// SAFETY:
// 1. `value` was valid at `ty` so we know
// `sub_val = &bytes[range_move(0..elem_ty.ty.size(), offset)]`
// is valid at `elem_ty.ty`, as `elem_ty`.
// 2. forward caller requirement.
unsafe { serialize_product_field(&mut ser, bytes, page, blob_store, offset, elem_ty) }?;
}
ser.end()
}
/// This has to be a `Cell<_>` here as we only get `&Value` in `Serialize`.
type CurrOffset<'a> = &'a Cell<usize>;
@@ -70,30 +112,52 @@ unsafe fn serialize_product<S: Serializer>(
curr_offset: CurrOffset<'_>,
ty: ProductTypeLayoutView<'_>,
) -> Result<S::Ok, S::Error> {
let elems = &ty.elements;
let elems = ty.elements;
let mut ser = ser.serialize_named_product(elems.len())?;
let my_offset = curr_offset.get();
for elem_ty in elems.iter() {
curr_offset.set(my_offset + elem_ty.offset as usize);
// SAFETY: By 1., `value` is valid at `ty`,
// so it follows that valid and properly aligned sub-`value`s
// are valid `elem_ty.ty`s.
// By 2., and the above, it follows that sub-`value`s won't have dangling `VarLenRef`s.
let value = Value {
bytes,
page,
blob_store,
curr_offset,
ty: &elem_ty.ty,
};
ser.serialize_element(elem_ty.name.as_deref(), &value)?;
let offset = my_offset + elem_ty.offset as usize;
// SAFETY:
// 1. `value` was valid at `ty` so we know
// `sub_val = &bytes[range_move(0..elem_ty.ty.size(), offset)]`
// is valid at `elem_ty.ty`, as `elem_ty`.
// 2. forward caller requirement.
unsafe { serialize_product_field(&mut ser, bytes, page, blob_store, offset, elem_ty) }?;
}
ser.end()
}
/// Serializes a product field in `value = &bytes[range_move(0..ty.size(), offset)]`,
/// where the field is typed at `elem_ty`, into `ser`.
///
/// SAFETY:
/// 1. the `value` must be valid at type `ty` and properly aligned for `ty`.
/// 2. for any `vlr: VarLenRef` stored in `value`,
/// `vlr.first_offset` must either be `NULL` or point to a valid granule in `page`.
unsafe fn serialize_product_field<S: SerializeNamedProduct>(
ser: &mut S,
bytes: &Bytes,
page: &Page,
blob_store: &dyn BlobStore,
offset: usize,
elem_ty: &ProductTypeElementLayout,
) -> Result<(), S::Error> {
// SAFETY: By 1., `value` is valid at `ty`,
// so it follows that valid and properly aligned sub-`value`s
// are valid `elem_ty.ty`s.
// By 2., and the above, it follows that sub-`value`s won't have dangling `VarLenRef`s.
let value = Value {
bytes,
page,
blob_store,
curr_offset: &Cell::new(offset),
ty: &elem_ty.ty,
};
ser.serialize_element(elem_ty.name.as_deref(), &value)
}
/// Serializes the sum value in `value = &bytes[range_move(0..ty.size(), *curr_offset)]`,
/// which is typed at `ty`, into `ser`.
///
+72 -28
View File
@@ -1,12 +1,7 @@
use crate::{
blob_store::NullBlobStore,
table_index::{IndexCannotSeekRange, IndexKind},
};
use super::{
bflatn_from::serialize_row_from_page,
bflatn_from::{serialize_columns_from_page, serialize_row_from_page},
bflatn_to::{write_row_to_pages, write_row_to_pages_bsatn, Error},
blob_store::BlobStore,
blob_store::{BlobStore, NullBlobStore},
eq::eq_row_in_page,
eq_to_pv::eq_row_in_page_to_pv,
indexes::{Bytes, PageIndex, PageOffset, RowHash, RowPointer, SquashedOffset, PAGE_DATA_SIZE},
@@ -20,7 +15,7 @@ use super::{
static_assert_size,
static_bsatn_validator::{static_bsatn_validator, validate_bsatn, StaticBsatnValidator},
static_layout::StaticLayout,
table_index::{TableIndex, TableIndexPointIter, TableIndexRangeIter},
table_index::{IndexCannotSeekRange, IndexKey, IndexKind, TableIndex, TableIndexPointIter, TableIndexRangeIter},
var_len::VarLenMembers,
};
use core::{fmt, ptr};
@@ -560,12 +555,13 @@ impl Table {
mut is_deleted: impl FnMut(RowPointer) -> bool,
) -> Result<(), UniqueConstraintViolation> {
for (&index_id, index) in adapt(self.indexes.iter()).filter(|(_, index)| index.is_unique()) {
// SAFETY: Caller promised that `row´ has the same layout as `self`.
// Thus, as `index.indexed_columns` is in-bounds of `self`'s layout,
// it's also in-bounds of `row`'s layout.
let value = unsafe { row.project_unchecked(&index.indexed_columns) };
if index.seek_point(&value).next().is_some_and(|ptr| !is_deleted(ptr)) {
return Err(self.build_error_unique(index, index_id, value));
// SAFETY: Caller promised that `row` has the same layout as `self`.
// The projection of `row`'s type onto the index's columns
// is therefore the same as the key type.
let key = unsafe { index.key_from_row(row) };
if index.seek_point(&key).next().is_some_and(|ptr| !is_deleted(ptr)) {
return Err(self.build_error_unique(index, index_id, row));
}
}
Ok(())
@@ -915,8 +911,7 @@ impl Table {
}
let index = self.indexes.get(&index_id).unwrap();
let value = new.project(&index.indexed_columns).unwrap();
let error = self.build_error_unique(index, index_id, value).into();
let error = self.build_error_unique(index, index_id, new).into();
(index_id, error)
})
.map_err(|(index_id, error)| {
@@ -959,10 +954,13 @@ impl Table {
.expect("there should be at least one unique index");
// Project the needle row to the columns of the index, and then seek.
// As this is a unique index, there are 0-1 rows for this key.
// SAFETY: `needle_table.is_row_present(needle_ptr)` holds.
let needle_row = unsafe { needle_table.get_row_ref_unchecked(needle_bs, needle_ptr) };
let key = needle_row
.project(&target_index.indexed_columns)
.expect("needle row should be valid");
// SAFETY: Caller promised that the row layout of both tables are the same.
// As `target_index` comes from `target_table`,
// it follows that `needle_row`'s type projected to `target_index`'s columns
// is the same as the index's key type.
let key = unsafe { target_index.key_from_row(needle_row) };
target_index.seek_point(&key).next().filter(|&target_ptr| {
// SAFETY:
// - Caller promised that the row layouts were the same.
@@ -1252,7 +1250,8 @@ impl Table {
let row_ref = unsafe { self.inner.get_row_ref_unchecked(blob_store, self.squashed_offset, ptr) };
for (_, index) in self.indexes.range_mut(..index_id) {
index.delete(row_ref).unwrap();
// SAFETY: any index in this table was constructed with the same row type as this table.
unsafe { index.delete(row_ref) };
}
}
@@ -1264,7 +1263,8 @@ impl Table {
let row_ref = unsafe { self.inner.get_row_ref_unchecked(blob_store, self.squashed_offset, ptr) };
for index in self.indexes.values_mut() {
index.delete(row_ref).unwrap();
// SAFETY: any index in this table was constructed with the same row type as this table.
unsafe { index.delete(row_ref) };
}
}
@@ -1757,6 +1757,21 @@ impl<'a> RowRef<'a> {
T::read_column(self, col.into().idx())
}
/// Serializes the `cols` of `self` using `ser`.
///
/// # Safety
///
/// Any `col` in `cols` is in-bounds of `self`'s layout.
pub unsafe fn serialize_columns_unchecked<S: Serializer>(self, cols: &ColList, ser: S) -> Result<S::Ok, S::Error> {
let table = self.table;
let (page, offset) = table.page_and_offset(self.pointer);
// SAFETY:
// - We have a `RowRef`, so `ptr` points to a valid row in this table
// so safety requirements 1-3 flow from that.
// - Caller promised that any `col` in `cols` is in-bounds of `self`'s layout.
unsafe { serialize_columns_from_page(ser, page, self.blob_store, offset, &table.row_layout, cols) }
}
/// Construct a projection of the row at `self` by extracting the `cols`.
///
/// If `cols` contains zero or more than one column, the values of the projected columns are wrapped in a [`ProductValue`].
@@ -1766,7 +1781,7 @@ impl<'a> RowRef<'a> {
///
/// - `cols` must not specify any column which is out-of-bounds for the row `self´.
pub unsafe fn project_unchecked(self, cols: &ColList) -> AlgebraicValue {
let col_layouts = &self.row_layout().product().elements;
let col_layouts = self.row_layout().product().elements;
if let Some(head) = cols.as_singleton() {
let head = head.idx();
@@ -1915,7 +1930,8 @@ impl Serialize for RowRef<'_> {
fn serialize<S: Serializer>(&self, ser: S) -> Result<S::Ok, S::Error> {
let table = self.table;
let (page, offset) = table.page_and_offset(self.pointer);
// SAFETY: `ptr` points to a valid row in this table per above check.
// SAFETY: We have a `RowRef`, so `ptr` points to a valid row in this table
// so safety requirements 1-3 flow from that.
unsafe { serialize_row_from_page(ser, page, self.blob_store, offset, &table.row_layout) }
}
}
@@ -2098,7 +2114,7 @@ impl<'a> TableAndIndex<'a> {
/// Returns an iterator yielding all rows in this index for `key`.
///
/// Matching is defined by `Eq for AlgebraicValue`.
pub fn seek_point(&self, key: &AlgebraicValue) -> IndexScanPointIter<'a> {
pub fn seek_point(&self, key: &IndexKey<'_>) -> IndexScanPointIter<'a> {
IndexScanPointIter {
table: self.table,
blob_store: self.blob_store,
@@ -2110,9 +2126,9 @@ impl<'a> TableAndIndex<'a> {
/// if the index is compatible with range seeks.
///
/// Matching is defined by `Ord for AlgebraicValue`.
pub fn seek_range(
pub fn seek_range<'b>(
&self,
range: &impl RangeBounds<AlgebraicValue>,
range: &impl RangeBounds<IndexKey<'b>>,
) -> Result<IndexScanRangeIter<'a>, IndexCannotSeekRange> {
Ok(IndexScanRangeIter {
table: self.table,
@@ -2120,6 +2136,33 @@ impl<'a> TableAndIndex<'a> {
btree_index_iter: self.index.seek_range(range)?,
})
}
/// Returns an iterator yielding all rows in this index for `key`.
///
/// Matching is defined by `Eq for AlgebraicValue`.
pub fn seek_point_via_algebraic_value(&self, key: &AlgebraicValue) -> IndexScanPointIter<'a> {
let key = self.index.key_from_algebraic_value(key);
self.seek_point(&key)
}
/// Returns an iterator yielding all rows in this index that fall within `range`,
/// if the index is compatible with range seeks.
///
/// Matching is defined by `Ord for AlgebraicValue`.
pub fn seek_range_via_algebraic_value(
&self,
range: &impl RangeBounds<AlgebraicValue>,
) -> Result<IndexScanRangeIter<'a>, IndexCannotSeekRange> {
let start = range.start_bound().map(|v| self.index.key_from_algebraic_value(v));
let end = range.end_bound().map(|v| self.index.key_from_algebraic_value(v));
let btree_index_iter = self.index.seek_range(&(start, end))?;
Ok(IndexScanRangeIter {
table: self.table,
blob_store: self.blob_store,
btree_index_iter,
})
}
}
/// An iterator using a [`TableIndex`] to scan a `table`
@@ -2232,14 +2275,15 @@ impl UniqueConstraintViolation {
// Private API:
impl Table {
/// Returns a unique constraint violation error for the given `index`
/// and the `value` that would have been duplicated.
/// and the `row` that caused the violation.
#[cold]
pub fn build_error_unique(
&self,
index: &TableIndex,
index_id: IndexId,
value: AlgebraicValue,
row: RowRef<'_>,
) -> UniqueConstraintViolation {
let value = row.project(&index.indexed_columns).unwrap();
let schema = self.get_schema();
UniqueConstraintViolation::build(schema, index, index_id, value)
}
+74 -19
View File
@@ -1,6 +1,7 @@
use super::same_key_entry::{same_key_iter, SameKeyEntry, SameKeyEntryIter};
use super::{key_size::KeyBytesStorage, Index, KeySize, RangedIndex};
use crate::indexes::RowPointer;
use core::borrow::Borrow;
use core::ops::RangeBounds;
use spacetimedb_sats::memory_usage::MemoryUsage;
use std::collections::btree_map::{BTreeMap, Range};
@@ -56,7 +57,7 @@ impl<K: Ord + KeySize> Index for BTreeIndex<K> {
/// and multimaps do not bind one `key` to the same `ptr`.
fn insert(&mut self, key: Self::Key, ptr: RowPointer) -> Result<(), RowPointer> {
self.num_rows += 1;
self.num_key_bytes.add_to_key_bytes::<Self>(&key);
self.num_key_bytes.add_to_key_bytes(&key);
self.map.entry(key).or_default().push(ptr);
Ok(())
}
@@ -65,22 +66,7 @@ impl<K: Ord + KeySize> Index for BTreeIndex<K> {
///
/// Returns whether `key -> ptr` was present.
fn delete(&mut self, key: &K, ptr: RowPointer) -> bool {
let Some(vset) = self.map.get_mut(key) else {
return false;
};
let (deleted, is_empty) = vset.delete(ptr);
if is_empty {
self.map.remove(key);
}
if deleted {
self.num_rows -= 1;
self.num_key_bytes.sub_from_key_bytes::<Self>(key);
}
deleted
self.delete(key, ptr)
}
type PointIter<'a>
@@ -88,8 +74,8 @@ impl<K: Ord + KeySize> Index for BTreeIndex<K> {
where
Self: 'a;
fn seek_point(&self, key: &Self::Key) -> Self::PointIter<'_> {
same_key_iter(self.map.get(key))
fn seek_point(&self, point: &Self::Key) -> Self::PointIter<'_> {
self.seek_point(point)
}
fn num_keys(&self) -> usize {
@@ -118,6 +104,57 @@ impl<K: Ord + KeySize> Index for BTreeIndex<K> {
}
}
impl<K: KeySize + Ord> BTreeIndex<K> {
/// See [`Index::delete`].
///
/// This version has relaxed bounds
/// where relaxed means that the key type can be borrowed from the index's key type
/// and need not be `Index::Key` itself.
/// This allows e.g., queries with `&str` rather than providing an owned string key.
/// This can be exploited to avoid heap alloctions in some situations,
/// e.g., borrowing the input directly from BSATN.
/// This is similar to the bounds on [`BTreeMap::remove`].
pub fn delete<Q>(&mut self, key: &Q, ptr: RowPointer) -> bool
where
Q: ?Sized + KeySize + Ord,
<Self as Index>::Key: Borrow<Q>,
{
let Some(vset) = self.map.get_mut(key) else {
return false;
};
let (deleted, is_empty) = vset.delete(ptr);
if is_empty {
self.map.remove(key);
}
if deleted {
self.num_rows -= 1;
self.num_key_bytes.sub_from_key_bytes(key);
}
deleted
}
/// See [`Index::seek_point`].
///
/// This version has relaxed bounds
/// where relaxed means that the key type can be borrowed from the index's key type
/// and need not be `Index::Key` itself.
/// This allows e.g., queries with `&str` rather than providing an owned string key.
/// This can be exploited to avoid heap alloctions in some situations,
/// e.g., borrowing the input directly from BSATN.
/// This is similar to the bounds on [`BTreeMap::get`].
pub fn seek_point<Q>(&self, point: &Q) -> <Self as Index>::PointIter<'_>
where
Q: ?Sized + Ord,
<Self as Index>::Key: Borrow<Q>,
{
same_key_iter(self.map.get(point))
}
}
impl<K: Ord + KeySize> RangedIndex for BTreeIndex<K> {
type RangeIter<'a>
= BTreeIndexRangeIter<'a, K>
@@ -127,6 +164,24 @@ impl<K: Ord + KeySize> RangedIndex for BTreeIndex<K> {
/// Returns an iterator over the multimap that yields all the `V`s
/// of the `K`s that fall within the specified `range`.
fn seek_range(&self, range: &impl RangeBounds<Self::Key>) -> Self::RangeIter<'_> {
self.seek_range(range)
}
}
impl<K: KeySize + Ord> BTreeIndex<K> {
/// See [`RangedIndex::seek_range`].
///
/// This version has relaxed bounds
/// where relaxed means that the key type can be borrowed from the index's key type
/// and need not be `Index::Key` itself.
/// This allows e.g., queries with `&str` rather than providing an owned string key.
/// This can be exploited to avoid heap alloctions in some situations,
/// e.g., borrowing the input directly from BSATN.
/// This is similar to the bounds on [`BTreeMap::range`].
pub fn seek_range<Q: ?Sized + Ord>(&self, range: &impl RangeBounds<Q>) -> <Self as RangedIndex>::RangeIter<'_>
where
<Self as Index>::Key: Borrow<Q>,
{
BTreeIndexRangeIter {
outer: self.map.range((range.start_bound(), range.end_bound())),
inner: SameKeyEntry::empty_iter(),
+239
View File
@@ -0,0 +1,239 @@
use super::{DecodeResult, RowRef};
use crate::indexes::RowPointer;
use core::mem;
use spacetimedb_memory_usage::MemoryUsage;
use spacetimedb_primitives::ColList;
use spacetimedb_sats::bsatn::{DecodeError, Deserializer, Serializer};
use spacetimedb_sats::de::{DeserializeSeed, Error as _};
use spacetimedb_sats::{u256, AlgebraicType, AlgebraicValue, ProductTypeElement, Serialize as _, WithTypespace};
/// A key for an all-primitive multi-column index
/// serialized to a byte array.
///
/// The key can store up to `N` bytes
/// where `N` is determined by the summed size of each column in the index
/// when serialized in BSATN format,
/// which is the same as little-endian encoding of the keys for primitive types.
///
/// As we cannot have too many different `N`s,
/// we have a few `N`s, where each is a power of 2.
/// A key is then padded with zeroes to the nearest `N`.
/// For example, a key `(x: u8, y: u16, z: u32)` for a 3-column index
/// would have `N = 1 + 2 + 4 = 7` but would be padded to `N = 8`.
#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Copy)]
pub(super) struct BytesKey<const N: usize>([u8; N]);
impl<const N: usize> MemoryUsage for BytesKey<N> {}
/// A difference between btree indices and hash indices
/// is that the former btree indices store keys and values separately,
/// i.e., as `([K], [RowPointer])`
/// whereas hash indices store them together,
/// i.e., as `([K, RowPointer])`.
///
/// For hash indices, it's therefore profitable to ensure
/// that the key and the value together fit into an `N` that is a power of 2.
/// An `N` that is a power of 2 is well aligned around cache line sizes.
pub(super) const fn size_sub_row_pointer(n: usize) -> usize {
n - mem::size_of::<RowPointer>()
}
/// Returns the number of bytes required at most to store a key at `ty`
/// when serialized in BSATN format.
///
/// If keys at `ty` are incompatible with fixed byte keys,
/// e.g., because they are of unbounded length,
/// then `None` is returned.
pub(super) fn required_bytes_key_size(ty: &AlgebraicType) -> Option<usize> {
use AlgebraicType::*;
match ty {
Ref(_) => unreachable!("should not have references at this point"),
// Variable length types are incompatible with fixed byte keys.
String | Array(_) => None,
// For sum, we report the greatest possible fixed size.
// A key may be of variable size, a long as it fits within an upper bound.
Sum(ty) => {
let mut max_size = 0;
for var in &ty.variants {
let variant_size = required_bytes_key_size(&var.algebraic_type)?;
max_size = max_size.max(variant_size);
}
// The sum tag is represented as a u8 in BSATN,
// so add a byte for the tag.
Some(1 + max_size)
}
// For a product, we report the sum of the fixed sizes of the elements.
Product(ty) => {
let mut total_size = 0;
for elem in &ty.elements {
total_size += required_bytes_key_size(&elem.algebraic_type)?;
}
Some(total_size)
}
// Primitives:
Bool | U8 | I8 => Some(mem::size_of::<u8>()),
U16 | I16 => Some(mem::size_of::<u16>()),
U32 | I32 | F32 => Some(mem::size_of::<u32>()),
U64 | I64 | F64 => Some(mem::size_of::<u64>()),
U128 | I128 => Some(mem::size_of::<u128>()),
U256 | I256 => Some(mem::size_of::<u256>()),
}
}
impl<const N: usize> BytesKey<N> {
/// Decodes `self` as an [`AlgebraicValue`] at `key_type`.
///
/// An incorrect `key_type`,
/// i.e., one other than what was used when the index was created,
/// may lead to a panic, but this is not guaranteed.
/// The method could also silently succeed
/// if the passed `key_type` incidentally happens to be compatible the stored bytes in `self`.
pub(super) fn decode_algebraic_value(&self, key_type: &AlgebraicType) -> AlgebraicValue {
AlgebraicValue::decode(key_type, &mut self.0.as_slice())
.expect("A `BytesKey` should by construction always deserialize to the right `key_type`")
}
/// Ensure bytes of length `got` fit in `N` or return an error.
fn ensure_key_fits(got: usize) -> DecodeResult<()> {
if got > N {
return Err(DecodeError::custom(format_args!(
"key provided is too long, expected at most {N}, but got {got}"
)));
}
Ok(())
}
/// Decodes `prefix` and `endpoint` in BSATN to a [`BytesKey<N>`]
/// by copying over both if they fit into the key.
pub(super) fn from_bsatn_prefix_and_endpoint(
prefix: &[u8],
prefix_types: &[ProductTypeElement],
endpoint: &[u8],
range_type: &AlgebraicType,
) -> DecodeResult<Self> {
// Validate the BSATN.
//
// The BSATN can originate from untrusted sources, e.g., from module code.
// This also means that a `BytesKey` can be trusted to hold valid BSATN
// for the key type, which we can rely on in e.g., `decode_algebraic_value`,
// which isn't used in a context where it would be appropriate to fail.
//
// Another reason to validate is that we wish for `BytesKey` to be strictly
// an optimization and not allow things that would be rejected by the non-optimized code.
WithTypespace::empty(prefix_types).validate(Deserializer::new(&mut { prefix }))?;
WithTypespace::empty(range_type).validate(Deserializer::new(&mut { endpoint }))?;
// Check that the `prefix` and the `endpoint` together fit into the key.
let prefix_len = prefix.len();
let endpoint_len = endpoint.len();
Self::ensure_key_fits(prefix_len + endpoint_len)?;
// Copy the `prefix` and the `endpoint` over.
let mut arr = [0; N];
arr[..prefix_len].copy_from_slice(prefix);
arr[prefix_len..prefix_len + endpoint_len].copy_from_slice(endpoint);
Ok(Self(arr))
}
/// Decodes `bytes` in BSATN to a [`BytesKey<N>`]
/// by copying over the bytes if they fit into the key.
pub(super) fn from_bsatn(ty: &AlgebraicType, bytes: &[u8]) -> DecodeResult<Self> {
// Validate the BSATN. See `Self::from_bsatn_prefix_and_endpoint` for more details.
WithTypespace::empty(ty).validate(Deserializer::new(&mut { bytes }))?;
// Check that the `bytes` fit into the key.
let got = bytes.len();
Self::ensure_key_fits(got)?;
// Copy the bytes over.
let mut arr = [0; N];
arr[..got].copy_from_slice(bytes);
Ok(Self(arr))
}
/// Serializes the columns `cols` in `row_ref` to a [`BytesKey<N>`].
///
/// It's assumed that `row_ref` projected to `cols`
/// will fit into `N` bytes when serialized into BSATN.
/// The method panics otherwise.
///
/// SAFETY: Any `col` in `cols` is in-bounds of `row_ref`'s layout.
pub(super) unsafe fn from_row_ref(cols: &ColList, row_ref: RowRef<'_>) -> Self {
let mut arr = [0; N];
let mut sink = arr.as_mut_slice();
let ser = Serializer::new(&mut sink);
unsafe { row_ref.serialize_columns_unchecked(cols, ser) }
.expect("should've serialized a `row_ref` to BSATN successfully");
Self(arr)
}
/// Serializes `av` to a [`BytesKey<N>`].
///
/// It's assumed that `av`
/// will fit into `N` bytes when serialized into BSATN.
/// The method panics otherwise.
pub(super) fn from_algebraic_value(av: &AlgebraicValue) -> Self {
let mut arr = [0; N];
let mut sink = arr.as_mut_slice();
let ser = Serializer::new(&mut sink);
av.serialize_into_bsatn(ser)
.expect("should've serialized an `AlgebraicValue` to BSATN successfully");
Self(arr)
}
}
#[cfg(test)]
mod test {
use super::*;
use proptest::prelude::*;
use spacetimedb_sats::bsatn::to_len;
use spacetimedb_sats::proptest::generate_typed_row;
const N: usize = 4096;
proptest! {
#[test]
fn test_bytes_key_round_trip((ty, av) in generate_typed_row()) {
let len = to_len(&av).unwrap();
prop_assume!(len <= N);
let ty = AlgebraicType::Product(ty);
let av = AlgebraicValue::Product(av);
let key = BytesKey::<N>::from_algebraic_value(&av);
let decoded_av = key.decode_algebraic_value(&ty);
assert_eq!(av, decoded_av);
}
/*
// This test turned out not to hold for integers larger than u8,
// as BSATN stores them little-endian,
// but `Ord for AlgebraicValue` compares them as big-endian.
// It's included here for posterity and in case we'd like to
// massage the BSATN before storing it in the `BytesKey`
// to make it order-preserving.
use proptest::array::uniform;
use spacetimedb_sats::proptest::{gen_with, generate_product_value, generate_row_type, SIZE};
#[test]
fn order_in_bsatn_is_preserved((ty, [r1, r2]) in gen_with(generate_row_type(0..=SIZE), |ty| uniform(generate_product_value(ty)))) {
let ty: AlgebraicType = ty.into();
let r1: AlgebraicValue = r1.into();
let r2: AlgebraicValue = r2.into();
let Some(required) = required_bytes_key_size(&ty, true) else {
//dbg!(&ty);
return Err(TestCaseError::reject("type is incompatible with fixed byte keys in range indices"));
};
prop_assume!(required <= N);
let k1 = BytesKey::<N>::from_algebraic_value(&r1);
let k2 = BytesKey::<N>::from_algebraic_value(&r2);
let ord_k = k1.cmp(&k2);
let ord_r = r1.cmp(&r2);
prop_assert_eq!(ord_k, ord_r);
}
*/
}
}
+55 -19
View File
@@ -4,6 +4,7 @@ use super::{
Index, KeySize,
};
use crate::indexes::RowPointer;
use core::borrow::Borrow;
use core::hash::Hash;
use spacetimedb_data_structures::map::hash_map::EntryRef;
use spacetimedb_sats::memory_usage::MemoryUsage;
@@ -63,7 +64,7 @@ impl<K: KeySize + Eq + Hash> Index for HashIndex<K> {
/// and multimaps do not bind one `key` to the same `ptr`.
fn insert(&mut self, key: Self::Key, ptr: RowPointer) -> Result<(), RowPointer> {
self.num_rows += 1;
self.num_key_bytes.add_to_key_bytes::<Self>(&key);
self.num_key_bytes.add_to_key_bytes(&key);
self.map.entry(key).or_default().push(ptr);
Ok(())
}
@@ -72,22 +73,7 @@ impl<K: KeySize + Eq + Hash> Index for HashIndex<K> {
///
/// Returns whether `key -> ptr` was present.
fn delete(&mut self, key: &K, ptr: RowPointer) -> bool {
let EntryRef::Occupied(mut entry) = self.map.entry_ref(key) else {
return false;
};
let (deleted, is_empty) = entry.get_mut().delete(ptr);
if deleted {
self.num_rows -= 1;
self.num_key_bytes.sub_from_key_bytes::<Self>(key);
}
if is_empty {
entry.remove();
}
deleted
self.delete(key, ptr)
}
type PointIter<'a>
@@ -95,8 +81,8 @@ impl<K: KeySize + Eq + Hash> Index for HashIndex<K> {
where
Self: 'a;
fn seek_point(&self, key: &Self::Key) -> Self::PointIter<'_> {
same_key_iter(self.map.get(key))
fn seek_point(&self, point: &Self::Key) -> Self::PointIter<'_> {
self.seek_point(point)
}
fn num_keys(&self) -> usize {
@@ -120,3 +106,53 @@ impl<K: KeySize + Eq + Hash> Index for HashIndex<K> {
Ok(())
}
}
impl<K: KeySize + Eq + Hash> HashIndex<K> {
/// See [`Index::delete`].
///
/// This version has relaxed bounds
/// where relaxed means that the key type can be borrowed from the index's key type
/// and need not be `Index::Key` itself.
/// This allows e.g., queries with `&str` rather than providing an owned string key.
/// This can be exploited to avoid heap alloctions in some situations,
/// e.g., borrowing the input directly from BSATN.
/// This is similar to the bounds on [`HashMap::remove`].
pub fn delete<Q>(&mut self, key: &Q, ptr: RowPointer) -> bool
where
Q: ?Sized + KeySize + Hash + Eq,
<Self as Index>::Key: Borrow<Q>,
{
let EntryRef::Occupied(mut entry) = self.map.entry_ref(key) else {
return false;
};
let (deleted, is_empty) = entry.get_mut().delete(ptr);
if deleted {
self.num_rows -= 1;
self.num_key_bytes.sub_from_key_bytes(entry.key());
}
if is_empty {
entry.remove();
}
deleted
}
/// See [`Index::seek_point`].
///
/// This version has relaxed bounds
/// where relaxed means that the key type can be borrowed from the index's key type
/// and need not be `Index::Key` itself.
/// This allows e.g., queries with `&str` rather than providing an owned string key.
/// This can be exploited to avoid heap alloctions in some situations,
/// e.g., borrowing the input directly from BSATN.
/// This is similar to the bounds on [`HashMap::get`].
pub fn seek_point<Q: ?Sized + Eq + Hash>(&self, point: &Q) -> <Self as Index>::PointIter<'_>
where
<Self as Index>::Key: Borrow<Q>,
{
same_key_iter(self.map.get(point))
}
}
+27 -7
View File
@@ -1,3 +1,5 @@
use crate::table_index::BytesKey;
use super::Index;
use core::mem;
use spacetimedb_memory_usage::MemoryUsage;
@@ -9,10 +11,10 @@ use spacetimedb_sats::{
/// Storage for memoizing `KeySize` statistics.
pub trait KeyBytesStorage: Default + MemoryUsage {
/// Add `key.key_size_in_bytes()` to the statistics.
fn add_to_key_bytes<I: Index>(&mut self, key: &I::Key);
fn add_to_key_bytes(&mut self, key: &(impl KeySize + ?Sized));
/// Subtract `key.key_size_in_bytes()` from the statistics.
fn sub_from_key_bytes<I: Index>(&mut self, key: &I::Key);
fn sub_from_key_bytes(&mut self, key: &(impl KeySize + ?Sized));
/// Resets the statistics to zero.
fn reset_to_zero(&mut self);
@@ -22,8 +24,8 @@ pub trait KeyBytesStorage: Default + MemoryUsage {
}
impl KeyBytesStorage for () {
fn add_to_key_bytes<I: Index>(&mut self, _: &I::Key) {}
fn sub_from_key_bytes<I: Index>(&mut self, _: &I::Key) {}
fn add_to_key_bytes(&mut self, _: &(impl KeySize + ?Sized)) {}
fn sub_from_key_bytes(&mut self, _: &(impl KeySize + ?Sized)) {}
fn reset_to_zero(&mut self) {}
fn get<I: Index>(&self, index: &I) -> u64 {
index.num_keys() as u64 * mem::size_of::<I::Key>() as u64
@@ -31,10 +33,10 @@ impl KeyBytesStorage for () {
}
impl KeyBytesStorage for u64 {
fn add_to_key_bytes<I: Index>(&mut self, key: &I::Key) {
fn add_to_key_bytes(&mut self, key: &(impl KeySize + ?Sized)) {
*self += key.key_size_in_bytes() as u64;
}
fn sub_from_key_bytes<I: Index>(&mut self, key: &I::Key) {
fn sub_from_key_bytes(&mut self, key: &(impl KeySize + ?Sized)) {
*self -= key.key_size_in_bytes() as u64;
}
fn reset_to_zero(&mut self) {
@@ -79,6 +81,20 @@ pub trait KeySize {
}
}
impl<T: ?Sized + KeySize> KeySize for &T {
type MemoStorage = T::MemoStorage;
fn key_size_in_bytes(&self) -> usize {
(**self).key_size_in_bytes()
}
}
impl<T: ?Sized + KeySize> KeySize for Box<T> {
type MemoStorage = T::MemoStorage;
fn key_size_in_bytes(&self) -> usize {
(**self).key_size_in_bytes()
}
}
macro_rules! impl_key_size_primitive {
($prim:ty) => {
impl KeySize for $prim {
@@ -112,7 +128,7 @@ impl_key_size_primitive!(
F64,
);
impl KeySize for Box<str> {
impl KeySize for str {
type MemoStorage = u64;
fn key_size_in_bytes(&self) -> usize {
self.len()
@@ -202,3 +218,7 @@ impl KeySize for ArrayValue {
}
}
}
impl<const N: usize> KeySize for BytesKey<N> {
type MemoStorage = ();
}
File diff suppressed because it is too large Load Diff
@@ -1,6 +1,6 @@
use super::{Index, KeySize, RangedIndex};
use crate::{indexes::RowPointer, table_index::key_size::KeyBytesStorage};
use core::{ops::RangeBounds, option::IntoIter};
use core::{borrow::Borrow, ops::RangeBounds, option::IntoIter};
use spacetimedb_sats::memory_usage::MemoryUsage;
use std::collections::btree_map::{BTreeMap, Entry, Range};
@@ -41,7 +41,7 @@ impl<K: Ord + KeySize> Index for UniqueBTreeIndex<K> {
fn insert(&mut self, key: K, val: RowPointer) -> Result<(), RowPointer> {
match self.map.entry(key) {
Entry::Vacant(e) => {
self.num_key_bytes.add_to_key_bytes::<Self>(e.key());
self.num_key_bytes.add_to_key_bytes(e.key());
e.insert(val);
Ok(())
}
@@ -49,12 +49,8 @@ impl<K: Ord + KeySize> Index for UniqueBTreeIndex<K> {
}
}
fn delete(&mut self, key: &K, _: RowPointer) -> bool {
let ret = self.map.remove(key).is_some();
if ret {
self.num_key_bytes.sub_from_key_bytes::<Self>(key);
}
ret
fn delete(&mut self, key: &K, ptr: RowPointer) -> bool {
self.delete(key, ptr)
}
fn num_keys(&self) -> usize {
@@ -71,7 +67,7 @@ impl<K: Ord + KeySize> Index for UniqueBTreeIndex<K> {
Self: 'a;
fn seek_point(&self, point: &Self::Key) -> Self::PointIter<'_> {
UniquePointIter::new(self.map.get(point).copied())
self.seek_point(point)
}
/// Deletes all entries from the map, leaving it empty.
@@ -94,7 +90,47 @@ impl<K: Ord + KeySize> Index for UniqueBTreeIndex<K> {
}
}
/// An iterator over the potential value in a unique index for a given key.crates/table/src/table_index/uniquemap.rs
impl<K: KeySize + Ord> UniqueBTreeIndex<K> {
/// See [`Index::delete`].
///
/// This version has relaxed bounds
/// where relaxed means that the key type can be borrowed from the index's key type
/// and need not be `Index::Key` itself.
/// This allows e.g., queries with `&str` rather than providing an owned string key.
/// This can be exploited to avoid heap alloctions in some situations,
/// e.g., borrowing the input directly from BSATN.
/// This is similar to the bounds on [`BTreeMap::remove`].
pub fn delete<Q>(&mut self, key: &Q, _: RowPointer) -> bool
where
Q: ?Sized + KeySize + Ord,
<Self as Index>::Key: Borrow<Q>,
{
let ret = self.map.remove(key).is_some();
if ret {
self.num_key_bytes.sub_from_key_bytes(key);
}
ret
}
/// See [`Index::seek_point`].
///
/// This version has relaxed bounds
/// where relaxed means that the key type can be borrowed from the index's key type
/// and need not be `Index::Key` itself.
/// This allows e.g., queries with `&str` rather than providing an owned string key.
/// This can be exploited to avoid heap alloctions in some situations,
/// e.g., borrowing the input directly from BSATN.
/// This is similar to the bounds on [`BTreeMap::get`].
pub fn seek_point<Q>(&self, point: &Q) -> <Self as Index>::PointIter<'_>
where
Q: ?Sized + Ord,
<Self as Index>::Key: Borrow<Q>,
{
UniquePointIter::new(self.map.get(point).copied())
}
}
/// An iterator over the potential value in a unique index for a given key.
pub struct UniquePointIter {
/// The iterator seeking for matching keys in the range.
pub(super) iter: IntoIter<RowPointer>,
@@ -123,6 +159,24 @@ impl<K: Ord + KeySize> RangedIndex for UniqueBTreeIndex<K> {
Self: 'a;
fn seek_range(&self, range: &impl RangeBounds<Self::Key>) -> Self::RangeIter<'_> {
self.seek_range(range)
}
}
impl<K: KeySize + Ord> UniqueBTreeIndex<K> {
/// See [`RangedIndex::seek_range`].
///
/// This version has relaxed bounds
/// where relaxed means that the key type can be borrowed from the index's key type
/// and need not be `Index::Key` itself.
/// This allows e.g., queries with `&str` rather than providing an owned string key.
/// This can be exploited to avoid heap alloctions in some situations,
/// e.g., borrowing the input directly from BSATN.
/// This is similar to the bounds on [`BTreeMap::range`].
pub fn seek_range<Q: ?Sized + Ord>(&self, range: &impl RangeBounds<Q>) -> <Self as RangedIndex>::RangeIter<'_>
where
<Self as Index>::Key: Borrow<Q>,
{
UniqueBTreeIndexRangeIter {
iter: self.map.range((range.start_bound(), range.end_bound())),
}
@@ -1,6 +1,7 @@
use super::unique_btree_index::UniquePointIter;
use super::{Index, KeySize};
use crate::{indexes::RowPointer, table_index::key_size::KeyBytesStorage};
use core::borrow::Borrow;
use core::hash::Hash;
use spacetimedb_data_structures::map::hash_map::Entry;
use spacetimedb_sats::memory_usage::MemoryUsage;
@@ -46,7 +47,7 @@ impl<K: KeySize + Eq + Hash> Index for UniqueHashIndex<K> {
fn insert(&mut self, key: Self::Key, ptr: RowPointer) -> Result<(), RowPointer> {
match self.map.entry(key) {
Entry::Vacant(e) => {
self.num_key_bytes.add_to_key_bytes::<Self>(e.key());
self.num_key_bytes.add_to_key_bytes(e.key());
e.insert(ptr);
Ok(())
}
@@ -54,12 +55,8 @@ impl<K: KeySize + Eq + Hash> Index for UniqueHashIndex<K> {
}
}
fn delete(&mut self, key: &Self::Key, _: RowPointer) -> bool {
let ret = self.map.remove(key).is_some();
if ret {
self.num_key_bytes.sub_from_key_bytes::<Self>(key);
}
ret
fn delete(&mut self, key: &Self::Key, ptr: RowPointer) -> bool {
self.delete(key, ptr)
}
fn clear(&mut self) {
@@ -92,6 +89,45 @@ impl<K: KeySize + Eq + Hash> Index for UniqueHashIndex<K> {
Self: 'a;
fn seek_point(&self, point: &Self::Key) -> Self::PointIter<'_> {
self.seek_point(point)
}
}
impl<K: KeySize + Eq + Hash> UniqueHashIndex<K> {
/// See [`Index::delete`].
///
/// This version has relaxed bounds
/// where relaxed means that the key type can be borrowed from the index's key type
/// and need not be `Index::Key` itself.
/// This allows e.g., queries with `&str` rather than providing an owned string key.
/// This can be exploited to avoid heap alloctions in some situations,
/// e.g., borrowing the input directly from BSATN.
/// This is similar to the bounds on [`HashMap::remove`].
pub fn delete<Q>(&mut self, key: &Q, _: RowPointer) -> bool
where
Q: ?Sized + KeySize + Hash + Eq,
<Self as Index>::Key: Borrow<Q>,
{
let ret = self.map.remove(key).is_some();
if ret {
self.num_key_bytes.sub_from_key_bytes(key);
}
ret
}
/// See [`Index::seek_point`].
///
/// This version has relaxed bounds
/// where relaxed means that the key type can be borrowed from the index's key type
/// and need not be `Index::Key` itself.
/// This allows e.g., queries with `&str` rather than providing an owned string key.
/// This can be exploited to avoid heap alloctions in some situations,
/// e.g., borrowing the input directly from BSATN.
/// This is similar to the bounds on [`HashMap::get`].
pub fn seek_point<Q: ?Sized + Eq + Hash>(&self, point: &Q) -> <Self as Index>::PointIter<'_>
where
<Self as Index>::Key: Borrow<Q>,
{
UniquePointIter::new(self.map.get(point).copied())
}
}