mirror of
https://github.com/clockworklabs/SpacetimeDB.git
synced 2026-05-10 01:30:37 -04:00
8e3af49f64
# Description of Changes Fixes https://github.com/clockworklabs/SpacetimeDB/issues/2824. Defines a global pool `BsatnRowListBuilderPool` which reclaims the buffers of a `ServerMessage<BsatnFormat>` and which is then used when building new `ServerMessage<BsatnFormat>`s. Notes: 1. The new pool `BsatnRowListBuilderPool` reports the same kind of metrics to prometheus as `PagePool` does. 2. `BsatnRowListBuilder` now works in terms of `BytesMut`. 3. The trait method `fn to_bsatn_extend` is redefined to be capable of dealing with `BytesMut` as well as `Vec<u8>`. 4. A trait `ConsumeEachBuffer` is defined from `ServerMessage<BsatnFormat>` and down to extract buffers. `<ServerMessage<_> as ConsumeEachBuffer>::consume_each_buffer(...)` is then called in `messages::serialize(...)` just after bsatn-encoding the entire message and before any compression is done. This is the place where the pool reclaims buffers. # Benchmarks Benchmark numbers vs. master using `cargo bench --bench subscription -- --baseline subs` on i7-7700K, 64GB RAM: ``` footprint-scan time: [21.607 ms 21.873 ms 22.187 ms] change: [-62.090% -61.438% -60.787%] (p = 0.00 < 0.05) Performance has improved. full-scan time: [22.185 ms 22.245 ms 22.324 ms] change: [-36.884% -36.497% -36.166%] (p = 0.00 < 0.05) Performance has improved. ``` The improvements in `footprint-scan` are mostly thanks to https://github.com/clockworklabs/SpacetimeDB/pull/2918, but 7 ms of the improvements here are thanks to the pool. The improvements to `full-scan` should be only thanks to the pool. # API and ABI breaking changes None # Expected complexity level and risk 2? # Testing - Tests for `Pool<T>` also apply to `BsatnRowListBuilderPool`.
242 lines
6.7 KiB
Rust
242 lines
6.7 KiB
Rust
use anyhow::Result;
|
|
use core::hash::{Hash, Hasher};
|
|
use core::ops::RangeBounds;
|
|
use spacetimedb_lib::query::Delta;
|
|
use spacetimedb_physical_plan::plan::{ProjectField, TupleField};
|
|
use spacetimedb_primitives::{ColList, IndexId, TableId};
|
|
use spacetimedb_sats::bsatn::{BufReservedFill, EncodeError, ToBsatn};
|
|
use spacetimedb_sats::buffer::BufWriter;
|
|
use spacetimedb_sats::product_value::InvalidFieldError;
|
|
use spacetimedb_sats::{impl_serialize, AlgebraicValue, ProductValue};
|
|
use spacetimedb_table::{static_assert_size, table::RowRef};
|
|
|
|
pub mod dml;
|
|
pub mod pipelined;
|
|
|
|
pub trait Datastore {
|
|
/// Iterator type for table scans
|
|
type TableIter<'a>: Iterator<Item = RowRef<'a>> + 'a
|
|
where
|
|
Self: 'a;
|
|
|
|
/// Iterator type for ranged index scans.
|
|
type RangeIndexIter<'a>: Iterator<Item = RowRef<'a>> + 'a
|
|
where
|
|
Self: 'a;
|
|
|
|
/// Iterator type for point index scans.
|
|
type PointIndexIter<'a>: Iterator<Item = RowRef<'a>> + 'a
|
|
where
|
|
Self: 'a;
|
|
|
|
/// Returns the number of rows in this table
|
|
fn row_count(&self, table_id: TableId) -> u64;
|
|
|
|
/// Scans and returns all of the rows in a table
|
|
fn table_scan<'a>(&'a self, table_id: TableId) -> Result<Self::TableIter<'a>>;
|
|
|
|
/// Scans a range of keys from an index returning a [`RowRef`] iterator.
|
|
fn index_scan_range<'a>(
|
|
&'a self,
|
|
table_id: TableId,
|
|
index_id: IndexId,
|
|
range: &impl RangeBounds<AlgebraicValue>,
|
|
) -> Result<Self::RangeIndexIter<'a>>;
|
|
|
|
/// Scans a key from an index returning a [`RowRef`] iterator.
|
|
fn index_scan_point<'a>(
|
|
&'a self,
|
|
table_id: TableId,
|
|
index_id: IndexId,
|
|
point: &AlgebraicValue,
|
|
) -> Result<Self::PointIndexIter<'a>>;
|
|
}
|
|
|
|
pub trait DeltaStore {
|
|
fn num_inserts(&self, table_id: TableId) -> usize;
|
|
fn num_deletes(&self, table_id: TableId) -> usize;
|
|
|
|
fn has_inserts(&self, table_id: TableId) -> bool {
|
|
self.num_inserts(table_id) != 0
|
|
}
|
|
|
|
fn has_deletes(&self, table_id: TableId) -> bool {
|
|
self.num_deletes(table_id) != 0
|
|
}
|
|
|
|
fn inserts_for_table(&self, table_id: TableId) -> Option<std::slice::Iter<'_, ProductValue>>;
|
|
fn deletes_for_table(&self, table_id: TableId) -> Option<std::slice::Iter<'_, ProductValue>>;
|
|
|
|
fn index_scan_range_for_delta(
|
|
&self,
|
|
table_id: TableId,
|
|
index_id: IndexId,
|
|
delta: Delta,
|
|
range: impl RangeBounds<AlgebraicValue>,
|
|
) -> impl Iterator<Item = Row<'_>>;
|
|
|
|
fn index_scan_point_for_delta(
|
|
&self,
|
|
table_id: TableId,
|
|
index_id: IndexId,
|
|
delta: Delta,
|
|
point: &AlgebraicValue,
|
|
) -> impl Iterator<Item = Row<'_>>;
|
|
|
|
fn delta_scan(&self, table_id: TableId, inserts: bool) -> DeltaScanIter<'_> {
|
|
match inserts {
|
|
true => DeltaScanIter {
|
|
iter: self.inserts_for_table(table_id),
|
|
},
|
|
false => DeltaScanIter {
|
|
iter: self.deletes_for_table(table_id),
|
|
},
|
|
}
|
|
}
|
|
}
|
|
|
|
#[derive(Clone)]
|
|
pub enum Row<'a> {
|
|
Ptr(RowRef<'a>),
|
|
Ref(&'a ProductValue),
|
|
}
|
|
|
|
impl PartialEq for Row<'_> {
|
|
fn eq(&self, other: &Self) -> bool {
|
|
match (self, other) {
|
|
(Self::Ptr(x), Self::Ptr(y)) => x == y,
|
|
(Self::Ref(x), Self::Ref(y)) => x == y,
|
|
(Self::Ptr(x), Self::Ref(y)) => x == *y,
|
|
(Self::Ref(x), Self::Ptr(y)) => y == *x,
|
|
}
|
|
}
|
|
}
|
|
|
|
impl Eq for Row<'_> {}
|
|
|
|
impl Hash for Row<'_> {
|
|
fn hash<H: Hasher>(&self, state: &mut H) {
|
|
match self {
|
|
Self::Ptr(x) => x.hash(state),
|
|
Self::Ref(x) => x.hash(state),
|
|
}
|
|
}
|
|
}
|
|
|
|
impl Row<'_> {
|
|
pub fn to_product_value(&self) -> ProductValue {
|
|
match self {
|
|
Self::Ptr(ptr) => ptr.to_product_value(),
|
|
Self::Ref(val) => (*val).clone(),
|
|
}
|
|
}
|
|
|
|
pub fn project_product(self, cols: &ColList) -> Result<ProductValue, InvalidFieldError> {
|
|
match self {
|
|
Self::Ptr(ptr) => ptr.project_product(cols),
|
|
Self::Ref(val) => val.project_product(cols),
|
|
}
|
|
}
|
|
}
|
|
|
|
impl_serialize!(['a] Row<'a>, (self, ser) => match self {
|
|
Self::Ptr(row) => row.serialize(ser),
|
|
Self::Ref(row) => row.serialize(ser),
|
|
});
|
|
|
|
impl ToBsatn for Row<'_> {
|
|
fn static_bsatn_size(&self) -> Option<u16> {
|
|
match self {
|
|
Self::Ptr(ptr) => ptr.static_bsatn_size(),
|
|
Self::Ref(val) => val.static_bsatn_size(),
|
|
}
|
|
}
|
|
|
|
fn to_bsatn_extend(&self, buf: &mut (impl BufWriter + BufReservedFill)) -> std::result::Result<(), EncodeError> {
|
|
match self {
|
|
Self::Ptr(ptr) => ptr.to_bsatn_extend(buf),
|
|
Self::Ref(val) => val.to_bsatn_extend(buf),
|
|
}
|
|
}
|
|
|
|
fn to_bsatn_vec(&self) -> std::result::Result<Vec<u8>, EncodeError> {
|
|
match self {
|
|
Self::Ptr(ptr) => ptr.to_bsatn_vec(),
|
|
Self::Ref(val) => val.to_bsatn_vec(),
|
|
}
|
|
}
|
|
}
|
|
|
|
impl ProjectField for Row<'_> {
|
|
fn project(&self, field: &TupleField) -> AlgebraicValue {
|
|
match self {
|
|
Self::Ptr(ptr) => ptr.project(field),
|
|
Self::Ref(val) => val.project(field),
|
|
}
|
|
}
|
|
}
|
|
|
|
/// Each query operator returns a tuple of [RowRef]s
|
|
#[derive(Clone)]
|
|
pub enum Tuple<'a> {
|
|
/// A pointer to a row in a base table
|
|
Row(Row<'a>),
|
|
/// A temporary returned by a join operator
|
|
Join(Vec<Row<'a>>),
|
|
}
|
|
|
|
static_assert_size!(Tuple, 40);
|
|
|
|
impl ProjectField for Tuple<'_> {
|
|
fn project(&self, field: &TupleField) -> AlgebraicValue {
|
|
match self {
|
|
Self::Row(row) => row.project(field),
|
|
Self::Join(ptrs) => field
|
|
.label_pos
|
|
.and_then(|i| ptrs.get(i))
|
|
.map(|ptr| ptr.project(field))
|
|
.unwrap(),
|
|
}
|
|
}
|
|
}
|
|
|
|
impl<'a> Tuple<'a> {
|
|
/// Select the tuple element at position `i`
|
|
fn select(self, i: usize) -> Option<Row<'a>> {
|
|
match self {
|
|
Self::Row(_) => None,
|
|
Self::Join(mut ptrs) => Some(ptrs.swap_remove(i)),
|
|
}
|
|
}
|
|
|
|
/// Append a [Row] to a tuple
|
|
fn append(self, ptr: Row<'a>) -> Self {
|
|
match self {
|
|
Self::Row(row) => Self::Join(vec![row, ptr]),
|
|
Self::Join(mut rows) => {
|
|
rows.push(ptr);
|
|
Self::Join(rows)
|
|
}
|
|
}
|
|
}
|
|
|
|
fn join(self, with: Self) -> Self {
|
|
match with {
|
|
Self::Row(ptr) => self.append(ptr),
|
|
Self::Join(ptrs) => ptrs.into_iter().fold(self, |tup, ptr| tup.append(ptr)),
|
|
}
|
|
}
|
|
}
|
|
|
|
pub struct DeltaScanIter<'a> {
|
|
iter: Option<std::slice::Iter<'a, ProductValue>>,
|
|
}
|
|
|
|
impl<'a> Iterator for DeltaScanIter<'a> {
|
|
type Item = &'a ProductValue;
|
|
|
|
fn next(&mut self) -> Option<Self::Item> {
|
|
self.iter.as_mut().and_then(|iter| iter.next())
|
|
}
|
|
}
|