mirror of
https://github.com/clockworklabs/SpacetimeDB.git
synced 2026-05-06 07:26:43 -04:00
feat(296): index range iterator (#303)
Fixes #296. The previous btree index iterator returned entries for a single key. This change set adds an iterator for returning a range of values.
This commit is contained in:
@@ -122,7 +122,7 @@ impl BTreeIndex {
|
||||
/// Returns an iterator over the [BTreeIndex] that yields all the `RowId`s
|
||||
/// that fall within the specified `range`.
|
||||
#[tracing::instrument(skip_all)]
|
||||
pub(crate) fn scan_range(&self, range: impl RangeBounds<AlgebraicValue>) -> BTreeIndexRangeIter {
|
||||
pub(crate) fn seek<'a>(&'a self, range: &impl RangeBounds<AlgebraicValue>) -> BTreeIndexRangeIter<'a> {
|
||||
let map = |bound, datakey| match bound {
|
||||
Bound::Included(x) => Bound::Included(IndexKey::from_row(x, datakey)),
|
||||
Bound::Excluded(x) => Bound::Excluded(IndexKey::from_row(x, datakey)),
|
||||
@@ -135,21 +135,6 @@ impl BTreeIndex {
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns an iterator over the [BTreeIndex] that yields all the `RowId`s
|
||||
/// that match the specified `value` in the indexed column.
|
||||
///
|
||||
/// Matches is defined by `Ord for AlgebraicValue`.
|
||||
///
|
||||
/// For a unique index this will always yield at most one `RowId`.
|
||||
#[tracing::instrument(skip_all)]
|
||||
pub(crate) fn seek<'a>(&'a self, value: &AlgebraicValue) -> BTreeIndexRangeIter<'a> {
|
||||
let k_start = IndexKey::from_row(value, DataKey::min_datakey());
|
||||
let k_end = IndexKey::from_row(value, DataKey::max_datakey());
|
||||
BTreeIndexRangeIter {
|
||||
range_iter: self.idx.range(k_start..k_end),
|
||||
}
|
||||
}
|
||||
|
||||
/// Construct the [BTreeIndex] from the rows.
|
||||
#[tracing::instrument(skip_all)]
|
||||
pub(crate) fn build_from_rows<'a>(&mut self, rows: impl Iterator<Item = &'a ProductValue>) -> Result<(), DBError> {
|
||||
|
||||
@@ -207,10 +207,10 @@ impl CommittedState {
|
||||
&'a self,
|
||||
table_id: &TableId,
|
||||
col_id: &ColId,
|
||||
value: &AlgebraicValue,
|
||||
range: &impl RangeBounds<AlgebraicValue>,
|
||||
) -> Option<BTreeIndexRangeIter<'a>> {
|
||||
if let Some(table) = self.tables.get(table_id) {
|
||||
table.index_seek(*col_id, value)
|
||||
table.index_seek(*col_id, range)
|
||||
} else {
|
||||
None
|
||||
}
|
||||
@@ -321,9 +321,9 @@ impl TxState {
|
||||
&'a self,
|
||||
table_id: &TableId,
|
||||
col_id: &ColId,
|
||||
value: &AlgebraicValue,
|
||||
range: &impl RangeBounds<AlgebraicValue>,
|
||||
) -> Option<BTreeIndexRangeIter<'a>> {
|
||||
self.insert_tables.get(table_id)?.index_seek(*col_id, value)
|
||||
self.insert_tables.get(table_id)?.index_seek(*col_id, range)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -824,10 +824,11 @@ impl Inner {
|
||||
// TODO(george): As part of the bootstrapping process, we add a bunch of rows
|
||||
// and only at very end do we patch things up and create table metadata, indexes,
|
||||
// and so on. Early parts of that process insert rows, and need the schema to do
|
||||
// so. We can't just call iter_by_col_eq here as that would attempt to use the
|
||||
// so. We can't just call `iter_by_col_range` here as that would attempt to use the
|
||||
// index which we haven't created yet. So instead we just manually Scan here.
|
||||
let rows = IterByColEq::Scan(ScanIterByColEq {
|
||||
value: table_id.into(),
|
||||
let value: AlgebraicValue = table_id.into();
|
||||
let rows = IterByColRange::Scan(ScanIterByColRange {
|
||||
range: value,
|
||||
col_id: table_id_col,
|
||||
scan_iter: self.iter(&ST_TABLES_ID)?,
|
||||
})
|
||||
@@ -1462,29 +1463,33 @@ impl Inner {
|
||||
Err(TableError::IdNotFound(table_id.0).into())
|
||||
}
|
||||
|
||||
fn iter_by_col_range<'a, R: std::ops::RangeBounds<spacetimedb_sats::AlgebraicValue>>(
|
||||
/// Returns an iterator,
|
||||
/// yielding every row in the table identified by `table_id`,
|
||||
/// where the column data identified by `col_id` equates to `value`.
|
||||
fn iter_by_col_eq(
|
||||
&self,
|
||||
table_id: &TableId,
|
||||
col_id: &ColId,
|
||||
value: AlgebraicValue,
|
||||
) -> super::Result<IterByColEq<'_>> {
|
||||
self.iter_by_col_range(table_id, col_id, value)
|
||||
}
|
||||
|
||||
/// Returns an iterator,
|
||||
/// yielding every row in the table identified by `table_id`,
|
||||
/// where the values of `col_id` are contained in `range`.
|
||||
fn iter_by_col_range<'a, R: RangeBounds<AlgebraicValue>>(
|
||||
&'a self,
|
||||
table_id: &TableId,
|
||||
col_id: &ColId,
|
||||
range: R,
|
||||
) -> super::Result<IterByColRange<'a, R>> {
|
||||
Ok(IterByColRange::Scan(ScanIterByColRange {
|
||||
range,
|
||||
scan_iter: self.iter(table_id)?,
|
||||
col_id: *col_id,
|
||||
}))
|
||||
}
|
||||
|
||||
/// Returns an iterator,
|
||||
/// yielding every row in the table identified by `table_id`,
|
||||
/// where the column data identified by `col_id` equates to `value`.
|
||||
fn iter_by_col_eq(&self, table_id: &TableId, col_id: &ColId, value: AlgebraicValue) -> super::Result<IterByColEq> {
|
||||
// We have to index_seek in both the committed state and the current tx state.
|
||||
// First, we will check modifications in the current tx. It may be that the table
|
||||
// has not been modified yet in the current tx, in which case we will only search
|
||||
// the committed state. Finally, the table may not be indexed at all, in which case
|
||||
// we fall back to iterating the entire table.
|
||||
|
||||
//
|
||||
// We need to check the tx_state first. In particular, it may be that the index
|
||||
// was only added in the current transaction.
|
||||
// TODO(george): It's unclear that we truly support dynamically creating an index
|
||||
@@ -1493,41 +1498,37 @@ impl Inner {
|
||||
if let Some(inserted_rows) = self
|
||||
.tx_state
|
||||
.as_ref()
|
||||
.and_then(|tx_state| tx_state.index_seek(table_id, col_id, &value))
|
||||
.and_then(|tx_state| tx_state.index_seek(table_id, col_id, &range))
|
||||
{
|
||||
// The current transaction has modified this table, and the table is indexed.
|
||||
let tx_state = self.tx_state.as_ref().unwrap();
|
||||
Ok(IterByColEq::Index(IndexIterByColEq {
|
||||
value: value.clone(),
|
||||
col_id: *col_id,
|
||||
iter: IndexSeekIterInner {
|
||||
table_id: *table_id,
|
||||
tx_state,
|
||||
inserted_rows,
|
||||
committed_rows: self.committed_state.index_seek(table_id, col_id, &value),
|
||||
committed_state: &self.committed_state,
|
||||
},
|
||||
Ok(IterByColRange::Index(IndexSeekIterInner {
|
||||
table_id: *table_id,
|
||||
tx_state,
|
||||
inserted_rows,
|
||||
committed_rows: self.committed_state.index_seek(table_id, col_id, &range),
|
||||
committed_state: &self.committed_state,
|
||||
}))
|
||||
} else {
|
||||
// Either the current transaction has not modified this table, or the table is not
|
||||
// indexed.
|
||||
match self.committed_state.index_seek(table_id, col_id, &value) {
|
||||
match self.committed_state.index_seek(table_id, col_id, &range) {
|
||||
//If we don't have `self.tx_state` yet is likely we are running the bootstrap process
|
||||
Some(committed_rows) => match self.tx_state.as_ref() {
|
||||
None => Ok(IterByColEq::Scan(ScanIterByColEq {
|
||||
value,
|
||||
None => Ok(IterByColRange::Scan(ScanIterByColRange {
|
||||
range,
|
||||
col_id: *col_id,
|
||||
scan_iter: self.iter(table_id)?,
|
||||
})),
|
||||
Some(tx_state) => Ok(IterByColEq::CommittedIndex(CommittedIndexIterByColEq {
|
||||
Some(tx_state) => Ok(IterByColRange::CommittedIndex(CommittedIndexIter {
|
||||
table_id: *table_id,
|
||||
tx_state,
|
||||
committed_state: &self.committed_state,
|
||||
committed_rows,
|
||||
})),
|
||||
},
|
||||
None => Ok(IterByColEq::Scan(ScanIterByColEq {
|
||||
value,
|
||||
None => Ok(IterByColRange::Scan(ScanIterByColRange {
|
||||
range,
|
||||
col_id: *col_id,
|
||||
scan_iter: self.iter(table_id)?,
|
||||
})),
|
||||
@@ -1760,75 +1761,7 @@ impl Iterator for Iter<'_> {
|
||||
}
|
||||
}
|
||||
|
||||
/// An iterator returned from `iter_by_col_eq`. This yields up all
|
||||
/// rows in a table which have a column with a particular value.
|
||||
pub enum IterByColEq<'a> {
|
||||
/// When the column in question does not have an index.
|
||||
Scan(ScanIterByColEq<'a>),
|
||||
|
||||
/// When the column has an index, and the table
|
||||
/// has been modified this transaction.
|
||||
Index(IndexIterByColEq<'a>),
|
||||
|
||||
/// When the column has an index, and the table
|
||||
/// has not been modified in this transaction.
|
||||
CommittedIndex(CommittedIndexIterByColEq<'a>),
|
||||
}
|
||||
|
||||
impl Iterator for IterByColEq<'_> {
|
||||
type Item = DataRef;
|
||||
|
||||
fn next(&mut self) -> Option<Self::Item> {
|
||||
match self {
|
||||
IterByColEq::Scan(seek) => seek.next(),
|
||||
IterByColEq::Index(seek) => seek.next(),
|
||||
IterByColEq::CommittedIndex(seek) => seek.next(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub struct ScanIterByColEq<'a> {
|
||||
scan_iter: Iter<'a>,
|
||||
col_id: ColId,
|
||||
value: AlgebraicValue,
|
||||
}
|
||||
|
||||
impl Iterator for ScanIterByColEq<'_> {
|
||||
type Item = DataRef;
|
||||
|
||||
#[tracing::instrument(skip_all)]
|
||||
fn next(&mut self) -> Option<Self::Item> {
|
||||
for data_ref in &mut self.scan_iter {
|
||||
let row = data_ref.view();
|
||||
let value = &row.elements[self.col_id.0 as usize];
|
||||
if &self.value == value {
|
||||
return Some(data_ref);
|
||||
}
|
||||
}
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
pub struct IndexIterByColEq<'a> {
|
||||
iter: IndexSeekIterInner<'a>,
|
||||
col_id: ColId,
|
||||
value: AlgebraicValue,
|
||||
}
|
||||
|
||||
impl Iterator for IndexIterByColEq<'_> {
|
||||
type Item = DataRef;
|
||||
|
||||
#[tracing::instrument(skip_all)]
|
||||
fn next(&mut self) -> Option<Self::Item> {
|
||||
self.iter.find(|data_ref| {
|
||||
let row = data_ref.view();
|
||||
let value = &row.elements[self.col_id.0 as usize];
|
||||
&self.value == value
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
struct IndexSeekIterInner<'a> {
|
||||
pub struct IndexSeekIterInner<'a> {
|
||||
table_id: TableId,
|
||||
tx_state: &'a TxState,
|
||||
committed_state: &'a CommittedState,
|
||||
@@ -1862,14 +1795,14 @@ impl Iterator for IndexSeekIterInner<'_> {
|
||||
}
|
||||
}
|
||||
|
||||
pub struct CommittedIndexIterByColEq<'a> {
|
||||
pub struct CommittedIndexIter<'a> {
|
||||
table_id: TableId,
|
||||
tx_state: &'a TxState,
|
||||
committed_state: &'a CommittedState,
|
||||
committed_rows: BTreeIndexRangeIter<'a>,
|
||||
}
|
||||
|
||||
impl Iterator for CommittedIndexIterByColEq<'_> {
|
||||
impl Iterator for CommittedIndexIter<'_> {
|
||||
type Item = DataRef;
|
||||
|
||||
fn next(&mut self) -> Option<Self::Item> {
|
||||
@@ -1896,9 +1829,21 @@ fn get_committed_row(state: &CommittedState, table_id: &TableId, row_id: &RowId)
|
||||
)
|
||||
}
|
||||
|
||||
/// An [IterByColRange] for an individual column value.
|
||||
pub type IterByColEq<'a> = IterByColRange<'a, AlgebraicValue>;
|
||||
|
||||
/// An iterator for a range of values in a column.
|
||||
pub enum IterByColRange<'a, R: RangeBounds<AlgebraicValue>> {
|
||||
/// When the column in question does not have an index.
|
||||
Scan(ScanIterByColRange<'a, R>),
|
||||
// TODO: Index(IndexRangeScanIter<'a>),
|
||||
|
||||
/// When the column has an index, and the table
|
||||
/// has been modified this transaction.
|
||||
Index(IndexSeekIterInner<'a>),
|
||||
|
||||
/// When the column has an index, and the table
|
||||
/// has not been modified in this transaction.
|
||||
CommittedIndex(CommittedIndexIter<'a>),
|
||||
}
|
||||
|
||||
impl<R: RangeBounds<AlgebraicValue>> Iterator for IterByColRange<'_, R> {
|
||||
@@ -1907,7 +1852,8 @@ impl<R: RangeBounds<AlgebraicValue>> Iterator for IterByColRange<'_, R> {
|
||||
fn next(&mut self) -> Option<Self::Item> {
|
||||
match self {
|
||||
IterByColRange::Scan(range) => range.next(),
|
||||
// TODO: RangeScanIter::Index(range) => range.next(),
|
||||
IterByColRange::Index(range) => range.next(),
|
||||
IterByColRange::CommittedIndex(seek) => seek.next(),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1935,14 +1881,14 @@ impl<R: RangeBounds<AlgebraicValue>> Iterator for ScanIterByColRange<'_, R> {
|
||||
|
||||
impl TxDatastore for Locking {
|
||||
type Iter<'a> = Iter<'a> where Self: 'a;
|
||||
type IterByColRange<'a, R: std::ops::RangeBounds<spacetimedb_sats::AlgebraicValue>> = IterByColRange<'a, R> where Self: 'a;
|
||||
type IterByColEq<'a> = IterByColEq<'a> where Self: 'a;
|
||||
type IterByColEq<'a> = IterByColRange<'a, AlgebraicValue> where Self: 'a;
|
||||
type IterByColRange<'a, R: RangeBounds<AlgebraicValue>> = IterByColRange<'a, R> where Self: 'a;
|
||||
|
||||
fn iter_tx<'a>(&'a self, tx: &'a Self::TxId, table_id: TableId) -> super::Result<Self::Iter<'a>> {
|
||||
self.iter_mut_tx(tx, table_id)
|
||||
}
|
||||
|
||||
fn iter_by_col_range_tx<'a, R: std::ops::RangeBounds<spacetimedb_sats::AlgebraicValue>>(
|
||||
fn iter_by_col_range_tx<'a, R: RangeBounds<AlgebraicValue>>(
|
||||
&'a self,
|
||||
tx: &'a Self::TxId,
|
||||
table_id: TableId,
|
||||
@@ -1957,7 +1903,7 @@ impl TxDatastore for Locking {
|
||||
tx: &'a Self::TxId,
|
||||
table_id: TableId,
|
||||
col_id: ColId,
|
||||
value: spacetimedb_sats::AlgebraicValue,
|
||||
value: AlgebraicValue,
|
||||
) -> super::Result<Self::IterByColEq<'a>> {
|
||||
self.iter_by_col_eq_mut_tx(tx, table_id, col_id, value)
|
||||
}
|
||||
@@ -2081,7 +2027,7 @@ impl MutTxDatastore for Locking {
|
||||
tx.lock.iter(&table_id)
|
||||
}
|
||||
|
||||
fn iter_by_col_range_mut_tx<'a, R: std::ops::RangeBounds<spacetimedb_sats::AlgebraicValue>>(
|
||||
fn iter_by_col_range_mut_tx<'a, R: RangeBounds<AlgebraicValue>>(
|
||||
&'a self,
|
||||
tx: &'a Self::MutTxId,
|
||||
table_id: TableId,
|
||||
@@ -2096,7 +2042,7 @@ impl MutTxDatastore for Locking {
|
||||
tx: &'a Self::MutTxId,
|
||||
table_id: TableId,
|
||||
col_id: ColId,
|
||||
value: spacetimedb_sats::AlgebraicValue,
|
||||
value: AlgebraicValue,
|
||||
) -> super::Result<Self::IterByColEq<'a>> {
|
||||
tx.lock.iter_by_col_eq(&table_id, &col_id, value)
|
||||
}
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
use super::{
|
||||
btree_index::{BTreeIndex, BTreeIndexIter, BTreeIndexRangeIter},
|
||||
btree_index::{BTreeIndex, BTreeIndexRangeIter},
|
||||
RowId,
|
||||
};
|
||||
use crate::db::datastore::traits::{ColId, TableSchema};
|
||||
@@ -56,24 +56,14 @@ impl Table {
|
||||
|
||||
/// When there's an index for `col_id`,
|
||||
/// returns an iterator over the [`BTreeIndex`] that yields all the `RowId`s
|
||||
/// that match the specified `value` in the indexed column.
|
||||
/// that match the specified `range` in the indexed column.
|
||||
///
|
||||
/// Matching is defined by `Ord for AlgebraicValue`.
|
||||
///
|
||||
/// For a unique index this will always yield at most one `RowId`.
|
||||
pub(crate) fn index_seek<'a>(&'a self, col_id: ColId, value: &AlgebraicValue) -> Option<BTreeIndexRangeIter<'a>> {
|
||||
self.indexes.get(&col_id).map(|index| index.seek(value))
|
||||
}
|
||||
|
||||
pub(crate) fn _index_scan(&self, col_id: ColId) -> BTreeIndexIter<'_> {
|
||||
self.indexes.get(&col_id).unwrap().scan()
|
||||
}
|
||||
|
||||
pub(crate) fn _index_range_scan(
|
||||
pub(crate) fn index_seek(
|
||||
&self,
|
||||
col_id: ColId,
|
||||
range: impl RangeBounds<AlgebraicValue>,
|
||||
) -> BTreeIndexRangeIter<'_> {
|
||||
self.indexes.get(&col_id).unwrap().scan_range(range)
|
||||
range: &impl RangeBounds<AlgebraicValue>,
|
||||
) -> Option<BTreeIndexRangeIter<'_>> {
|
||||
self.indexes.get(&col_id).map(|index| index.seek(range))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -478,7 +478,7 @@ impl RelationalDB {
|
||||
/// where the column data identified by `col_id` matches what is within `range`.
|
||||
///
|
||||
/// Matching is defined by `Ord for AlgebraicValue`.
|
||||
pub fn iter_by_col_range<'a, R: RangeBounds<AlgebraicValue> + 'a>(
|
||||
pub fn iter_by_col_range<'a, R: RangeBounds<AlgebraicValue>>(
|
||||
&'a self,
|
||||
tx: &'a MutTxId,
|
||||
table_id: u32,
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
pub mod de;
|
||||
pub mod ser;
|
||||
use std::collections::BTreeMap;
|
||||
use std::ops::{Bound, RangeBounds};
|
||||
|
||||
use crate::builtin_value::{F32, F64};
|
||||
use crate::{AlgebraicType, ArrayValue, BuiltinType, BuiltinValue, ProductValue, SumValue};
|
||||
@@ -446,6 +447,17 @@ impl<T: Into<AlgebraicValue>> From<Option<T>> for AlgebraicValue {
|
||||
}
|
||||
}
|
||||
|
||||
// An AlgebraicValue can be interpreted as a range containing a only the value itself.
|
||||
// This is useful for BTrees where single key scans are still viewed range scans.
|
||||
impl RangeBounds<AlgebraicValue> for AlgebraicValue {
|
||||
fn start_bound(&self) -> Bound<&AlgebraicValue> {
|
||||
Bound::Included(self)
|
||||
}
|
||||
fn end_bound(&self) -> Bound<&AlgebraicValue> {
|
||||
Bound::Included(self)
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::collections::BTreeMap;
|
||||
|
||||
Reference in New Issue
Block a user