feat(748): Add compile time feature flag for db metrics (#749)

Closes #748.

This patch adds a temporary feature flag for enabling db metrics.

Metrics are recorded synchronously at the moment.
This can have a noticable impact on latency.

Compiling with this flag will enable metrics collection.
This new flag will be turned on by default.
Hence metrics will be collected by default.

Note this flag is temporary.
It will be removed once metrics are recorded async.
This commit is contained in:
joshua-spacetime
2024-01-26 17:33:26 -08:00
committed by GitHub
parent e395e4ee30
commit 5df73e89e7
10 changed files with 82 additions and 55 deletions
+2 -2
View File
@@ -45,7 +45,7 @@ jobs:
sudo chmod 777 /stdb
- name: Run cargo test
run: cargo test --all --features odb_rocksdb,odb_sled
run: cargo test --all --features odb_rocksdb,odb_sled,metrics
lints:
name: Lints
@@ -61,7 +61,7 @@ jobs:
run: cargo fmt --all -- --check
- name: Run cargo clippy
run: cargo clippy --all --tests --features odb_rocksdb,odb_sled -- -D warnings
run: cargo clippy --all --tests --features odb_rocksdb,odb_sled,metrics -- -D warnings
- name: Check benchmarks
run: cd crates/bench && cargo check --benches
+3 -1
View File
@@ -94,10 +94,12 @@ wasmtime.workspace = true
rocksdb = {workspace = true, optional = true}
[features]
# Enable metrics in spacetimedb.
metrics = []
# Optional storage engines.
odb_rocksdb = ["dep:rocksdb"]
odb_sled = []
default = ["odb_sled"]
default = ["odb_sled", "metrics"]
[dev-dependencies]
spacetimedb-lib = { path = "../lib", features = ["proptest"] }
+2
View File
@@ -377,6 +377,7 @@ impl CommitLogMut {
let operation = match record.op {
TxOp::Insert(_) => {
// Increment rows inserted metric
#[cfg(feature = "metrics")]
DB_METRICS
.rdb_num_rows_inserted
.with_label_values(workload, db, reducer, &table_id, table_name)
@@ -390,6 +391,7 @@ impl CommitLogMut {
}
TxOp::Delete => {
// Increment rows deleted metric
#[cfg(feature = "metrics")]
DB_METRICS
.rdb_num_rows_deleted
.with_label_values(workload, db, reducer, &table_id, table_name)
@@ -385,6 +385,7 @@ pub struct CommittedIndexIter<'a> {
pub(crate) num_committed_rows_fetched: u64,
}
#[cfg(feature = "metrics")]
impl Drop for CommittedIndexIter<'_> {
fn drop(&mut self) {
let table_name = self
@@ -346,14 +346,17 @@ impl traits::MutTx for Locking {
let reducer = ctx.reducer_name();
let elapsed_time = tx.timer.elapsed();
let cpu_time = elapsed_time - tx.lock_wait_time;
#[cfg(feature = "metrics")]
DB_METRICS
.rdb_num_txns
.with_label_values(workload, db, reducer, &false)
.inc();
#[cfg(feature = "metrics")]
DB_METRICS
.rdb_txn_cpu_time_sec
.with_label_values(workload, db, reducer)
.observe(cpu_time.as_secs_f64());
#[cfg(feature = "metrics")]
DB_METRICS
.rdb_txn_elapsed_time_sec
.with_label_values(workload, db, reducer)
@@ -362,44 +365,47 @@ impl traits::MutTx for Locking {
}
fn commit_mut_tx(&self, ctx: &ExecutionContext, tx: Self::MutTx) -> super::Result<Option<TxData>> {
let workload = &ctx.workload();
let db = &ctx.database();
let reducer = ctx.reducer_name();
let elapsed_time = tx.timer.elapsed();
let cpu_time = elapsed_time - tx.lock_wait_time;
#[cfg(feature = "metrics")]
{
let workload = &ctx.workload();
let db = &ctx.database();
let reducer = ctx.reducer_name();
let elapsed_time = tx.timer.elapsed();
let cpu_time = elapsed_time - tx.lock_wait_time;
let elapsed_time = elapsed_time.as_secs_f64();
let cpu_time = cpu_time.as_secs_f64();
// Note, we record empty transactions in our metrics.
// That is, transactions that don't write any rows to the commit log.
DB_METRICS
.rdb_num_txns
.with_label_values(workload, db, reducer, &true)
.inc();
DB_METRICS
.rdb_txn_cpu_time_sec
.with_label_values(workload, db, reducer)
.observe(cpu_time);
DB_METRICS
.rdb_txn_elapsed_time_sec
.with_label_values(workload, db, reducer)
.observe(elapsed_time);
let elapsed_time = elapsed_time.as_secs_f64();
let cpu_time = cpu_time.as_secs_f64();
// Note, we record empty transactions in our metrics.
// That is, transactions that don't write any rows to the commit log.
DB_METRICS
.rdb_num_txns
.with_label_values(workload, db, reducer, &true)
.inc();
DB_METRICS
.rdb_txn_cpu_time_sec
.with_label_values(workload, db, reducer)
.observe(cpu_time);
DB_METRICS
.rdb_txn_elapsed_time_sec
.with_label_values(workload, db, reducer)
.observe(elapsed_time);
let mut guard = MAX_TX_CPU_TIME.lock().unwrap();
let max_cpu_time = *guard
.entry((*db, *workload, reducer.to_owned()))
.and_modify(|max| {
if cpu_time > *max {
*max = cpu_time;
}
})
.or_insert_with(|| cpu_time);
let mut guard = MAX_TX_CPU_TIME.lock().unwrap();
let max_cpu_time = *guard
.entry((*db, *workload, reducer.to_owned()))
.and_modify(|max| {
if cpu_time > *max {
*max = cpu_time;
}
})
.or_insert_with(|| cpu_time);
drop(guard);
DB_METRICS
.rdb_txn_cpu_time_sec_max
.with_label_values(workload, db, reducer)
.set(max_cpu_time);
drop(guard);
DB_METRICS
.rdb_txn_cpu_time_sec_max
.with_label_values(workload, db, reducer)
.set(max_cpu_time);
}
tx.commit()
}
@@ -89,23 +89,26 @@ impl StateView for TxId {
#[allow(dead_code)]
impl TxId {
pub(crate) fn release(self, ctx: &ExecutionContext) {
let workload = &ctx.workload();
let db = &ctx.database();
let reducer = ctx.reducer_name();
let elapsed_time = self.timer.elapsed();
let cpu_time = elapsed_time - self.lock_wait_time;
DB_METRICS
.rdb_num_txns
.with_label_values(workload, db, reducer, &false)
.inc();
DB_METRICS
.rdb_txn_cpu_time_sec
.with_label_values(workload, db, reducer)
.observe(cpu_time.as_secs_f64());
DB_METRICS
.rdb_txn_elapsed_time_sec
.with_label_values(workload, db, reducer)
.observe(elapsed_time.as_secs_f64());
#[cfg(feature = "metrics")]
{
let workload = &ctx.workload();
let db = &ctx.database();
let reducer = ctx.reducer_name();
let elapsed_time = self.timer.elapsed();
let cpu_time = elapsed_time - self.lock_wait_time;
DB_METRICS
.rdb_num_txns
.with_label_values(workload, db, reducer, &false)
.inc();
DB_METRICS
.rdb_txn_cpu_time_sec
.with_label_values(workload, db, reducer)
.observe(cpu_time.as_secs_f64());
DB_METRICS
.rdb_txn_elapsed_time_sec
.with_label_values(workload, db, reducer)
.observe(elapsed_time.as_secs_f64());
}
}
}
@@ -200,6 +200,7 @@ pub struct Iter<'a> {
num_committed_rows_fetched: u64,
}
#[cfg(feature = "metrics")]
impl Drop for Iter<'_> {
fn drop(&mut self) {
DB_METRICS
@@ -333,6 +334,7 @@ pub struct IndexSeekIterMutTxId<'a> {
pub(crate) num_committed_rows_fetched: u64,
}
#[cfg(feature = "metrics")]
impl Drop for IndexSeekIterMutTxId<'_> {
fn drop(&mut self) {
let table_name = self
+3
View File
@@ -429,6 +429,7 @@ impl RelationalDB {
}
pub fn drop_table(&self, ctx: &ExecutionContext, tx: &mut MutTx, table_id: TableId) -> Result<(), DBError> {
#[cfg(feature = "metrics")]
let _guard = DB_METRICS
.rdb_drop_table_time
.with_label_values(&table_id.0)
@@ -619,6 +620,7 @@ impl RelationalDB {
#[tracing::instrument(skip(self, tx, row))]
pub fn insert(&self, tx: &mut MutTx, table_id: TableId, row: ProductValue) -> Result<ProductValue, DBError> {
#[cfg(feature = "metrics")]
let _guard = DB_METRICS
.rdb_insert_row_time
.with_label_values(&table_id.0)
@@ -644,6 +646,7 @@ impl RelationalDB {
#[tracing::instrument(skip_all)]
pub fn delete_by_rel<R: Relation>(&self, tx: &mut MutTx, table_id: TableId, relation: R) -> u32 {
#[cfg(feature = "metrics")]
let _guard = DB_METRICS
.rdb_delete_by_rel_time
.with_label_values(&table_id.0)
@@ -159,6 +159,7 @@ impl WasmInstanceEnv {
// TODO: make this part of cvt(), maybe?
/// Gather the appropriate metadata and log a wasm_abi_call_duration_ns with the given AbiCall & duration
#[cfg(feature = "metrics")]
fn start_abi_call_timer(&self, call: AbiCall) -> prometheus::HistogramTimer {
let ctx = self.reducer_context();
let db = ctx.database();
@@ -381,6 +382,7 @@ impl WasmInstanceEnv {
pub fn insert(caller: Caller<'_, Self>, table_id: u32, row: WasmPtr<u8>, row_len: u32) -> RtResult<u32> {
// TODO: Instead of writing this metric on every insert call,
// we should aggregate and write at the end of the transaction.
#[cfg(feature = "metrics")]
let _guard = caller.data().start_abi_call_timer(AbiCall::Insert);
Self::cvt(caller, AbiCall::Insert, |caller| {
@@ -425,6 +427,7 @@ impl WasmInstanceEnv {
value_len: u32,
out: WasmPtr<u32>,
) -> RtResult<u32> {
#[cfg(feature = "metrics")]
let _guard = caller.data().start_abi_call_timer(AbiCall::DeleteByColEq);
Self::cvt_ret(caller, AbiCall::DeleteByColEq, out, |caller| {
@@ -564,6 +567,7 @@ impl WasmInstanceEnv {
val_len: u32,
out: WasmPtr<BufferIdx>,
) -> RtResult<u32> {
#[cfg(feature = "metrics")]
let _guard = caller.data().start_abi_call_timer(AbiCall::IterByColEq);
Self::cvt_ret(caller, AbiCall::IterByColEq, out, |caller| {
@@ -593,6 +597,7 @@ impl WasmInstanceEnv {
/// - a table with the provided `table_id` doesn't exist
// #[tracing::instrument(skip_all)]
pub fn iter_start(caller: Caller<'_, Self>, table_id: u32, out: WasmPtr<BufferIterIdx>) -> RtResult<u32> {
#[cfg(feature = "metrics")]
let _guard = caller.data().start_abi_call_timer(AbiCall::IterStart);
Self::cvt_ret(caller, AbiCall::IterStart, out, |caller| {
@@ -629,6 +634,7 @@ impl WasmInstanceEnv {
filter_len: u32,
out: WasmPtr<BufferIterIdx>,
) -> RtResult<u32> {
#[cfg(feature = "metrics")]
let _guard = caller.data().start_abi_call_timer(AbiCall::IterStartFiltered);
Self::cvt_ret(caller, AbiCall::IterStartFiltered, out, |caller| {
@@ -380,6 +380,7 @@ impl QuerySet {
});
}
}
#[cfg(feature = "metrics")]
record_query_duration_metrics(WorkloadType::Subscribe, &db.address(), start);
}
}
@@ -394,6 +395,7 @@ impl QuerySet {
}
}
#[cfg(feature = "metrics")]
fn record_query_duration_metrics(workload: WorkloadType, db: &Address, start: Instant) {
let query_duration = start.elapsed().as_secs_f64();