Files
SpacetimeDB/crates/execution/src/lib.rs
Mazdak Farrokhzad 8e3af49f64 Reuse buffers in ServerMessage<BsatnFormat> (#2911)
# Description of Changes

Fixes https://github.com/clockworklabs/SpacetimeDB/issues/2824.

Defines a global pool `BsatnRowListBuilderPool` which reclaims the
buffers of a `ServerMessage<BsatnFormat>` and which is then used when
building new `ServerMessage<BsatnFormat>`s.

Notes:
1. The new pool `BsatnRowListBuilderPool` reports the same kind of
metrics to prometheus as `PagePool` does.
2. `BsatnRowListBuilder` now works in terms of `BytesMut`.
3. The trait method `fn to_bsatn_extend` is redefined to be capable of
dealing with `BytesMut` as well as `Vec<u8>`.
4. A trait `ConsumeEachBuffer` is defined from
`ServerMessage<BsatnFormat>` and down to extract buffers.
`<ServerMessage<_> as ConsumeEachBuffer>::consume_each_buffer(...)` is
then called in `messages::serialize(...)` just after bsatn-encoding the
entire message and before any compression is done. This is the place
where the pool reclaims buffers.

# Benchmarks

Benchmark numbers vs. master using `cargo bench --bench subscription --
--baseline subs` on i7-7700K, 64GB RAM:

```
footprint-scan          time:   [21.607 ms 21.873 ms 22.187 ms]
                        change: [-62.090% -61.438% -60.787%] (p = 0.00 < 0.05)
                        Performance has improved.

full-scan               time:   [22.185 ms 22.245 ms 22.324 ms]
                        change: [-36.884% -36.497% -36.166%] (p = 0.00 < 0.05)
                        Performance has improved.
```

The improvements in `footprint-scan` are mostly thanks to
https://github.com/clockworklabs/SpacetimeDB/pull/2918, but 7 ms of the
improvements here are thanks to the pool. The improvements to
`full-scan` should be only thanks to the pool.

# API and ABI breaking changes

None

# Expected complexity level and risk

2?

# Testing

- Tests for `Pool<T>` also apply to `BsatnRowListBuilderPool`.
2025-12-18 23:02:36 +00:00

242 lines
6.7 KiB
Rust

use anyhow::Result;
use core::hash::{Hash, Hasher};
use core::ops::RangeBounds;
use spacetimedb_lib::query::Delta;
use spacetimedb_physical_plan::plan::{ProjectField, TupleField};
use spacetimedb_primitives::{ColList, IndexId, TableId};
use spacetimedb_sats::bsatn::{BufReservedFill, EncodeError, ToBsatn};
use spacetimedb_sats::buffer::BufWriter;
use spacetimedb_sats::product_value::InvalidFieldError;
use spacetimedb_sats::{impl_serialize, AlgebraicValue, ProductValue};
use spacetimedb_table::{static_assert_size, table::RowRef};
pub mod dml;
pub mod pipelined;
pub trait Datastore {
/// Iterator type for table scans
type TableIter<'a>: Iterator<Item = RowRef<'a>> + 'a
where
Self: 'a;
/// Iterator type for ranged index scans.
type RangeIndexIter<'a>: Iterator<Item = RowRef<'a>> + 'a
where
Self: 'a;
/// Iterator type for point index scans.
type PointIndexIter<'a>: Iterator<Item = RowRef<'a>> + 'a
where
Self: 'a;
/// Returns the number of rows in this table
fn row_count(&self, table_id: TableId) -> u64;
/// Scans and returns all of the rows in a table
fn table_scan<'a>(&'a self, table_id: TableId) -> Result<Self::TableIter<'a>>;
/// Scans a range of keys from an index returning a [`RowRef`] iterator.
fn index_scan_range<'a>(
&'a self,
table_id: TableId,
index_id: IndexId,
range: &impl RangeBounds<AlgebraicValue>,
) -> Result<Self::RangeIndexIter<'a>>;
/// Scans a key from an index returning a [`RowRef`] iterator.
fn index_scan_point<'a>(
&'a self,
table_id: TableId,
index_id: IndexId,
point: &AlgebraicValue,
) -> Result<Self::PointIndexIter<'a>>;
}
pub trait DeltaStore {
fn num_inserts(&self, table_id: TableId) -> usize;
fn num_deletes(&self, table_id: TableId) -> usize;
fn has_inserts(&self, table_id: TableId) -> bool {
self.num_inserts(table_id) != 0
}
fn has_deletes(&self, table_id: TableId) -> bool {
self.num_deletes(table_id) != 0
}
fn inserts_for_table(&self, table_id: TableId) -> Option<std::slice::Iter<'_, ProductValue>>;
fn deletes_for_table(&self, table_id: TableId) -> Option<std::slice::Iter<'_, ProductValue>>;
fn index_scan_range_for_delta(
&self,
table_id: TableId,
index_id: IndexId,
delta: Delta,
range: impl RangeBounds<AlgebraicValue>,
) -> impl Iterator<Item = Row<'_>>;
fn index_scan_point_for_delta(
&self,
table_id: TableId,
index_id: IndexId,
delta: Delta,
point: &AlgebraicValue,
) -> impl Iterator<Item = Row<'_>>;
fn delta_scan(&self, table_id: TableId, inserts: bool) -> DeltaScanIter<'_> {
match inserts {
true => DeltaScanIter {
iter: self.inserts_for_table(table_id),
},
false => DeltaScanIter {
iter: self.deletes_for_table(table_id),
},
}
}
}
#[derive(Clone)]
pub enum Row<'a> {
Ptr(RowRef<'a>),
Ref(&'a ProductValue),
}
impl PartialEq for Row<'_> {
fn eq(&self, other: &Self) -> bool {
match (self, other) {
(Self::Ptr(x), Self::Ptr(y)) => x == y,
(Self::Ref(x), Self::Ref(y)) => x == y,
(Self::Ptr(x), Self::Ref(y)) => x == *y,
(Self::Ref(x), Self::Ptr(y)) => y == *x,
}
}
}
impl Eq for Row<'_> {}
impl Hash for Row<'_> {
fn hash<H: Hasher>(&self, state: &mut H) {
match self {
Self::Ptr(x) => x.hash(state),
Self::Ref(x) => x.hash(state),
}
}
}
impl Row<'_> {
pub fn to_product_value(&self) -> ProductValue {
match self {
Self::Ptr(ptr) => ptr.to_product_value(),
Self::Ref(val) => (*val).clone(),
}
}
pub fn project_product(self, cols: &ColList) -> Result<ProductValue, InvalidFieldError> {
match self {
Self::Ptr(ptr) => ptr.project_product(cols),
Self::Ref(val) => val.project_product(cols),
}
}
}
impl_serialize!(['a] Row<'a>, (self, ser) => match self {
Self::Ptr(row) => row.serialize(ser),
Self::Ref(row) => row.serialize(ser),
});
impl ToBsatn for Row<'_> {
fn static_bsatn_size(&self) -> Option<u16> {
match self {
Self::Ptr(ptr) => ptr.static_bsatn_size(),
Self::Ref(val) => val.static_bsatn_size(),
}
}
fn to_bsatn_extend(&self, buf: &mut (impl BufWriter + BufReservedFill)) -> std::result::Result<(), EncodeError> {
match self {
Self::Ptr(ptr) => ptr.to_bsatn_extend(buf),
Self::Ref(val) => val.to_bsatn_extend(buf),
}
}
fn to_bsatn_vec(&self) -> std::result::Result<Vec<u8>, EncodeError> {
match self {
Self::Ptr(ptr) => ptr.to_bsatn_vec(),
Self::Ref(val) => val.to_bsatn_vec(),
}
}
}
impl ProjectField for Row<'_> {
fn project(&self, field: &TupleField) -> AlgebraicValue {
match self {
Self::Ptr(ptr) => ptr.project(field),
Self::Ref(val) => val.project(field),
}
}
}
/// Each query operator returns a tuple of [RowRef]s
#[derive(Clone)]
pub enum Tuple<'a> {
/// A pointer to a row in a base table
Row(Row<'a>),
/// A temporary returned by a join operator
Join(Vec<Row<'a>>),
}
static_assert_size!(Tuple, 40);
impl ProjectField for Tuple<'_> {
fn project(&self, field: &TupleField) -> AlgebraicValue {
match self {
Self::Row(row) => row.project(field),
Self::Join(ptrs) => field
.label_pos
.and_then(|i| ptrs.get(i))
.map(|ptr| ptr.project(field))
.unwrap(),
}
}
}
impl<'a> Tuple<'a> {
/// Select the tuple element at position `i`
fn select(self, i: usize) -> Option<Row<'a>> {
match self {
Self::Row(_) => None,
Self::Join(mut ptrs) => Some(ptrs.swap_remove(i)),
}
}
/// Append a [Row] to a tuple
fn append(self, ptr: Row<'a>) -> Self {
match self {
Self::Row(row) => Self::Join(vec![row, ptr]),
Self::Join(mut rows) => {
rows.push(ptr);
Self::Join(rows)
}
}
}
fn join(self, with: Self) -> Self {
match with {
Self::Row(ptr) => self.append(ptr),
Self::Join(ptrs) => ptrs.into_iter().fold(self, |tup, ptr| tup.append(ptr)),
}
}
}
pub struct DeltaScanIter<'a> {
iter: Option<std::slice::Iter<'a, ProductValue>>,
}
impl<'a> Iterator for DeltaScanIter<'a> {
type Item = &'a ProductValue;
fn next(&mut self) -> Option<Self::Item> {
self.iter.as_mut().and_then(|iter| iter.next())
}
}