mirror of
https://github.com/clockworklabs/SpacetimeDB.git
synced 2026-05-06 07:26:43 -04:00
Make all VmMetrics for the reducers and views of a module in InstanceCommon::new (#4106)
# Description of Changes
With this PR, all `VmMetrics` for all the reducers and views of a module
are made in `InstanceCommon::new`
so that they are never made in `call_{reducer/view}_with_tx`.
Here's a flamegraph before this PR, zooming in on the smaller
`call_reducer_with_tx`
<img width="1877" height="617" alt="image"
src="https://github.com/user-attachments/assets/9751c066-2bf0-4ded-a091-afa7d3b5dd75"
/>
And after, for the same `call_reducer_with_tx`:
<img width="1883" height="610" alt="image"
src="https://github.com/user-attachments/assets/27083acd-d4c9-4b69-94c7-c26c7f1e7cef"
/>
Here are the performance numbers:
```
wasm, index=hash, branch=master, commit = af4d3f39e4
ran for 10.097515999 seconds
completed 1310720
throughput was 129806.18204812016 TPS
wasm, index=hash, branch=master, commit = bac3d2a5a928af896d315fcfdf709d42e3577b66
ran for 10.842949063 seconds
completed 1474560
throughput was 135992.52301495385 TPS
```
As you can see, this is a gain of about 6k TPS on the phoenix_nap
machine.
The second commit also adds the `d3-flamegraph-base.html` and stuff to
`.gitignore` to facilitate taking flamegraphs.
# API and ABI breaking changes
None
# Expected complexity level and risk
1, trivial changes that are not load bearing.
This commit is contained in:
committed by
GitHub
parent
7f6fd18018
commit
7138defc6e
@@ -2,6 +2,9 @@
|
||||
# Created by https://www.toptal.com/developers/gitignore/api/rust,node,visualstudiocode
|
||||
# Edit at https://www.toptal.com/developers/gitignore?templates=rust,node,visualstudiocode
|
||||
|
||||
perf.data
|
||||
perf.data.old
|
||||
flamegraph.html
|
||||
flamegraphs/*.svg
|
||||
flamegraphs/flamegraph.folded
|
||||
|
||||
|
||||
@@ -49,6 +49,7 @@ use spacetimedb_primitives::{ProcedureId, TableId, ViewFnPtr, ViewId};
|
||||
use spacetimedb_sats::algebraic_type::fmt::fmt_algebraic_type;
|
||||
use spacetimedb_sats::{AlgebraicType, AlgebraicTypeRef, Deserialize, ProductValue, Typespace, WithTypespace};
|
||||
use spacetimedb_schema::auto_migrate::{MigratePlan, MigrationPolicy, MigrationPolicyError};
|
||||
use spacetimedb_schema::def::deserialize::FunctionDef;
|
||||
use spacetimedb_schema::def::{ModuleDef, ViewDef};
|
||||
use spacetimedb_subscription::SubscriptionPlan;
|
||||
use std::sync::Arc;
|
||||
@@ -492,12 +493,17 @@ pub struct InstanceCommon {
|
||||
energy_monitor: Arc<dyn EnergyMonitor>,
|
||||
allocated_memory: usize,
|
||||
metric_wasm_memory_bytes: IntGauge,
|
||||
vm_metrics: AllVmMetrics,
|
||||
}
|
||||
|
||||
impl InstanceCommon {
|
||||
pub(crate) fn new(module: &ModuleCommon) -> Self {
|
||||
let info = module.info();
|
||||
let vm_metrics = AllVmMetrics::new(&info);
|
||||
|
||||
Self {
|
||||
info: module.info(),
|
||||
vm_metrics,
|
||||
energy_monitor: module.energy_monitor(),
|
||||
// Will be updated on the first reducer call.
|
||||
allocated_memory: 0,
|
||||
@@ -790,7 +796,6 @@ impl InstanceCommon {
|
||||
|
||||
let replica_ctx = inst.replica_ctx();
|
||||
let stdb = &*replica_ctx.relational_db.clone();
|
||||
let database_identity = replica_ctx.database_identity;
|
||||
let info = self.info.clone();
|
||||
let reducer_def = info.module_def.reducer_by_id(reducer_id);
|
||||
let reducer_name = &*reducer_def.name;
|
||||
@@ -812,17 +817,15 @@ impl InstanceCommon {
|
||||
let tx = tx.unwrap_or_else(|| stdb.begin_mut_tx(IsolationLevel::Serializable, workload));
|
||||
let mut tx_slot = inst.tx_slot();
|
||||
|
||||
let vm_metrics = VmMetrics::new(&database_identity, reducer_name);
|
||||
let vm_metrics = self.vm_metrics.get_for_reducer_id(reducer_id);
|
||||
let _guard = vm_metrics.timer_guard_for_reducer_plus_query(tx.timer);
|
||||
|
||||
let (mut tx, result) = tx_slot.set(tx, || {
|
||||
self.call_function(caller_identity, reducer_name, |budget| inst.call_reducer(op, budget))
|
||||
});
|
||||
|
||||
// Report the reducer execution metrics
|
||||
vm_metrics.report_energy_used(result.stats.energy_used());
|
||||
vm_metrics.report_total_duration(result.stats.total_duration());
|
||||
vm_metrics.report_abi_duration(result.stats.abi_duration());
|
||||
// Report execution metrics on each reducer call.
|
||||
vm_metrics.report(&result.stats);
|
||||
|
||||
// An outer error occurred.
|
||||
// This signifies a logic error in the module rather than a properly
|
||||
@@ -855,7 +858,7 @@ impl InstanceCommon {
|
||||
// We handle OnConnect events before running the reducer.
|
||||
let res = match reducer_def.lifecycle {
|
||||
Some(Lifecycle::OnDisconnect) => {
|
||||
tx.delete_st_client(caller_identity, caller_connection_id, database_identity)
|
||||
tx.delete_st_client(caller_identity, caller_connection_id, info.database_identity)
|
||||
}
|
||||
_ => Ok(()),
|
||||
};
|
||||
@@ -1117,13 +1120,10 @@ impl InstanceCommon {
|
||||
})
|
||||
});
|
||||
|
||||
let replica_ctx = inst.replica_ctx();
|
||||
let stdb = &*replica_ctx.relational_db.clone();
|
||||
let database_identity = replica_ctx.database_identity;
|
||||
let vm_metrics = VmMetrics::new(&database_identity, &view_name);
|
||||
|
||||
// Report execution metrics on each view call
|
||||
vm_metrics.report(&result.stats);
|
||||
// Report execution metrics on each view call.
|
||||
self.vm_metrics
|
||||
.get_for_view_id(view_id, &self.info.database_identity, &view_name)
|
||||
.report(&result.stats);
|
||||
|
||||
let trapped = matches!(result.call_result, Err(ExecutionError::Trap(_)));
|
||||
|
||||
@@ -1166,6 +1166,8 @@ impl InstanceCommon {
|
||||
.context("Error executing raw SQL returned by view".to_string())?,
|
||||
};
|
||||
|
||||
let replica_ctx = inst.replica_ctx();
|
||||
let stdb = &*replica_ctx.relational_db.clone();
|
||||
let res = match sender {
|
||||
Some(sender) => stdb.materialize_view(&mut tx, table_id, sender, rows),
|
||||
None => stdb.materialize_anonymous_view(&mut tx, table_id, rows),
|
||||
@@ -1338,7 +1340,71 @@ impl InstanceCommon {
|
||||
crate::host::scheduler::call_scheduled_function(&self.info.clone(), params, self, inst).await
|
||||
}
|
||||
}
|
||||
|
||||
/// Pre-fetched VM metrics counters for all reducers and views in a module.
|
||||
/// Anonymous views have lazily fetched metrics counters.
|
||||
struct AllVmMetrics {
|
||||
// We use a `Vec` here as the number of reducers + views
|
||||
// will likely be lower than e.g., 128, which would take up a page (4096 / 32).
|
||||
// TODO(perf, centril): Define a `VecMapWithFallback<N>`
|
||||
// that falls back to `HashMap` when exceeding `N` entries.
|
||||
// This could be useful elsewhere for e.g., TableId => X maps and similar.
|
||||
counters: Vec<VmMetrics>,
|
||||
num_reducers: u32,
|
||||
}
|
||||
|
||||
impl AllVmMetrics {
|
||||
/// Pre-fetch all vm metrics counters for the module in `info`.
|
||||
fn new(info: &ModuleInfo) -> Self {
|
||||
// These are the reducers:
|
||||
let def = &info.module_def;
|
||||
let reducers = def.reducer_ids_and_defs();
|
||||
let num_reducers = reducers.len() as u32;
|
||||
let reducers = reducers.map(|(_, def)| def.name());
|
||||
|
||||
// These are the views:
|
||||
let views = def.views().map(|def| def.name());
|
||||
|
||||
// Pre-fetch the metrics for both:
|
||||
let counters = reducers
|
||||
.chain(views)
|
||||
.map(|name| VmMetrics::new(&info.database_identity, name))
|
||||
.collect();
|
||||
|
||||
Self { counters, num_reducers }
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn get_for_index(&self, index: u32) -> Option<VmMetrics> {
|
||||
self.counters.get(index as usize).cloned()
|
||||
}
|
||||
|
||||
/// Returns the vm metrics counters for `id`,
|
||||
/// or panics if `id` was not pre-fetched in [`AllVmMetrics::new`].
|
||||
#[inline]
|
||||
fn get_for_reducer_id(&self, id: ReducerId) -> VmMetrics {
|
||||
self.get_for_index(id.0)
|
||||
.expect("all counters for reducers should've been pre-fetched")
|
||||
}
|
||||
|
||||
/// Returns the vm metrics counters for `id`,
|
||||
/// or panics if `id` was not pre-fetched in [`AllVmMetrics::new`].
|
||||
#[inline]
|
||||
fn get_for_view_id(&self, id: ViewId, identity: &Identity, name: &str) -> VmMetrics {
|
||||
// Cosunters for the first view starts after counters for the last reducer.
|
||||
self.get_for_index(self.num_reducers + id.0)
|
||||
// For anonymous views, the `id` doesn't have pre-fetched counters available.
|
||||
// Reducers shouldn't have to pay for adding an `Option` layer in the map,
|
||||
// which would be necessary due to `id`s being random here,
|
||||
// so just create `VmMetrics` on the fly instead.
|
||||
// TODO(perf, centril): We could ostensibly add another map for anonymous views,
|
||||
// but this doesn't seem to be a pressing performance concern at the moment.
|
||||
.unwrap_or_else(|| VmMetrics::new(identity, name))
|
||||
}
|
||||
}
|
||||
|
||||
/// VM-related metrics for reducer execution.
|
||||
#[derive(Clone)]
|
||||
struct VmMetrics {
|
||||
/// The time spent executing a reducer + plus evaluating its subscription queries.
|
||||
reducer_plus_query_duration: Histogram,
|
||||
|
||||
@@ -172,6 +172,11 @@ impl ModuleDef {
|
||||
self.reducers.values()
|
||||
}
|
||||
|
||||
/// Returns an iterator over all reducer ids and definitions.
|
||||
pub fn reducer_ids_and_defs(&self) -> impl ExactSizeIterator<Item = (ReducerId, &ReducerDef)> {
|
||||
self.reducers.values().enumerate().map(|(idx, def)| (idx.into(), def))
|
||||
}
|
||||
|
||||
/// The procedures of the module definition.
|
||||
pub fn procedures(&self) -> impl Iterator<Item = &ProcedureDef> {
|
||||
self.procedures.values()
|
||||
|
||||
File diff suppressed because one or more lines are too long
Reference in New Issue
Block a user