From 341fee3720900e558f221ce05f567ae55ec3b49a Mon Sep 17 00:00:00 2001 From: joshua-spacetime Date: Mon, 4 May 2026 22:41:21 -0700 Subject: [PATCH] Merge execution metrics with tx metrics --- crates/core/src/host/module_host.rs | 71 ++++++++----------- crates/core/src/host/v8/mod.rs | 30 ++------ .../subscription/module_subscription_actor.rs | 39 ++++++---- 3 files changed, 62 insertions(+), 78 deletions(-) diff --git a/crates/core/src/host/module_host.rs b/crates/core/src/host/module_host.rs index be447bdb3..e329702fb 100644 --- a/crates/core/src/host/module_host.rs +++ b/crates/core/src/host/module_host.rs @@ -1583,22 +1583,13 @@ impl ModuleHost { .call_view_command(label, cmd) .await .map_err(|e| DBError::Other(anyhow::anyhow!(e)))?; - Self::record_view_command_metrics_for_result(&self.info, self.relational_db(), metric, &result); + Self::record_view_command_round_trip(&self.info, metric); result } } } - pub(in crate::host) fn record_view_command_metrics_for_result( - info: &ModuleInfo, - db: &RelationalDB, - metric: ViewCommandMetric, - result: &ViewCommandResult, - ) { - if let Ok(Some(metrics)) = result { - db.exec_counters_for(metric.workload).record(metrics); - } - + pub(in crate::host) fn record_view_command_round_trip(info: &ModuleInfo, metric: ViewCommandMetric) { match metric.workload { WorkloadType::Subscribe => info .metrics @@ -2753,7 +2744,7 @@ impl ModuleHost { async |_, _| unreachable!("one-off query JS path is handled before Self::call"), ) .await?; - Self::record_one_off_query_metrics_for_result(&self.info, self.relational_db(), timer, &result); + Self::record_one_off_query_round_trip(&self.info, timer); result.map(|_| ()) } } @@ -2843,7 +2834,7 @@ impl ModuleHost { into_message: impl FnOnce(OneOffQueryResponseMessage) -> SerializableMessage, ) -> OneOffQueryResult { let (tx_offset_sender, tx_offset_receiver) = oneshot::channel(); - let tx = scopeguard::guard(db.begin_tx(Workload::Sql), |tx| { + let mut tx = scopeguard::guard(db.begin_tx(Workload::Sql), |tx| { let (tx_offset, tx_metrics, reducer) = db.release_tx(tx); let _ = tx_offset_sender.send(tx_offset); db.report_read_tx_metrics(reducer, tx_metrics); @@ -2914,15 +2905,18 @@ impl ModuleHost { let total_host_execution_duration = timer.elapsed().into(); let (message, metrics): (SerializableMessage, Option) = match result { - Ok((rows, metrics)) => ( - into_message(OneOffQueryResponseMessage { - message_id, - error: None, - results: vec![rows], - total_host_execution_duration, - }), - Some(metrics), - ), + Ok((rows, metrics)) => { + tx.metrics.merge(metrics); + ( + into_message(OneOffQueryResponseMessage { + message_id, + error: None, + results: vec![rows], + total_host_execution_duration, + }), + Some(metrics), + ) + } Err(err) => ( into_message(OneOffQueryResponseMessage { message_id, @@ -2986,7 +2980,7 @@ impl ModuleHost { rlb_pool, } = params; let (tx_offset_sender, tx_offset_receiver) = oneshot::channel(); - let tx = scopeguard::guard(db.begin_tx(Workload::Sql), |tx| { + let mut tx = scopeguard::guard(db.begin_tx(Workload::Sql), |tx| { let (tx_offset, tx_metrics, reducer) = db.release_tx(tx); let _ = tx_offset_sender.send(tx_offset); db.report_read_tx_metrics(reducer, tx_metrics); @@ -3048,15 +3042,18 @@ impl ModuleHost { })(); let (message, metrics) = match result { - Ok((rows, metrics)) => ( - ws_v2::OneOffQueryResult { - request_id, - result: Ok(ws_v2::QueryRows { - tables: vec![rows].into_boxed_slice(), - }), - }, - Some(metrics), - ), + Ok((rows, metrics)) => { + tx.metrics.merge(metrics); + ( + ws_v2::OneOffQueryResult { + request_id, + result: Ok(ws_v2::QueryRows { + tables: vec![rows].into_boxed_slice(), + }), + }, + Some(metrics), + ) + } Err(err) => ( ws_v2::OneOffQueryResult { request_id, @@ -3070,15 +3067,7 @@ impl ModuleHost { Ok(metrics) } - pub(in crate::host) fn record_one_off_query_metrics_for_result( - info: &ModuleInfo, - db: &RelationalDB, - timer: Instant, - result: &OneOffQueryResult, - ) { - if let Ok(Some(metrics)) = result { - db.exec_counters_for(WorkloadType::Sql).record(metrics); - } + pub(in crate::host) fn record_one_off_query_round_trip(info: &ModuleInfo, timer: Instant) { info.metrics .request_round_trip_sql .observe(timer.elapsed().as_secs_f64()); diff --git a/crates/core/src/host/v8/mod.rs b/crates/core/src/host/v8/mod.rs index e34a80069..3276e6c74 100644 --- a/crates/core/src/host/v8/mod.rs +++ b/crates/core/src/host/v8/mod.rs @@ -1344,13 +1344,8 @@ fn handle_main_worker_request( }), JsMainWorkerRequest::CallViewDetached { cmd, metric, on_panic } => { handle_detached_worker_request("call_view", on_panic, || { - let (res, trapped) = instance_common.handle_cmd(cmd, inst); - ModuleHost::record_view_command_metrics_for_result( - &module_common.info(), - module_common.replica_ctx().relational_db(), - metric, - &res, - ); + let (_, trapped) = instance_common.handle_cmd(cmd, inst); + ModuleHost::record_view_command_round_trip(&module_common.info(), metric); trapped }) } @@ -1365,12 +1360,7 @@ fn handle_main_worker_request( if let Err(err) = &res { log::warn!("detached one-off query failed: {err:#}"); } - ModuleHost::record_one_off_query_metrics_for_result( - &module_common.info(), - module_common.replica_ctx().relational_db(), - timer, - &res, - ); + ModuleHost::record_one_off_query_round_trip(&module_common.info(), timer); false }) } @@ -1381,12 +1371,7 @@ fn handle_main_worker_request( if let Err(err) = &res { log::warn!("detached one-off query failed: {err:#}"); } - ModuleHost::record_one_off_query_metrics_for_result( - &module_common.info(), - module_common.replica_ctx().relational_db(), - timer, - &res, - ); + ModuleHost::record_one_off_query_round_trip(&module_common.info(), timer); false }) } @@ -1397,12 +1382,7 @@ fn handle_main_worker_request( if let Err(err) = &res { log::warn!("detached one-off query failed: {err:#}"); } - ModuleHost::record_one_off_query_metrics_for_result( - &module_common.info(), - module_common.replica_ctx().relational_db(), - timer, - &res, - ); + ModuleHost::record_one_off_query_round_trip(&module_common.info(), timer); false }) } diff --git a/crates/core/src/subscription/module_subscription_actor.rs b/crates/core/src/subscription/module_subscription_actor.rs index 7c549cda1..80f43a71c 100644 --- a/crates/core/src/subscription/module_subscription_actor.rs +++ b/crates/core/src/subscription/module_subscription_actor.rs @@ -666,7 +666,7 @@ impl ModuleSubscriptions { let mut_tx = ScopeGuard::::into_inner(mut_tx); - let (tx, tx_offset, trapped) = + let (mut tx, tx_offset, trapped) = self.materialize_views_and_downgrade_tx(mut_tx, instance, &query, auth.caller())?; let (table_rows, metrics) = return_on_err_with_sql_bool!( @@ -674,6 +674,7 @@ impl ModuleSubscriptions { query.sql(), send_err_msg ); + tx.metrics.merge(metrics); // It acquires the subscription lock after `eval`, allowing `add_subscription` to run concurrently. // This also makes it possible for `broadcast_event` to get scheduled before the subsequent part here @@ -751,13 +752,14 @@ impl ModuleSubscriptions { return Ok(None); }; - let (tx, tx_offset) = self.unsubscribe_views(query, auth.caller())?; + let (mut tx, tx_offset) = self.unsubscribe_views(query, auth.caller())?; let (table_rows, metrics) = return_on_err_with_sql!( self.evaluate_initial_subscription(sender.clone(), query.clone(), &tx, &auth, TableUpdateType::Unsubscribe), query.sql(), send_err_msg ); + tx.metrics.merge(metrics); // Note: to make sure transaction updates are consistent, we need to put this in the broadcast // queue while we are still holding a read-lock on the database. @@ -831,7 +833,7 @@ impl ModuleSubscriptions { }; let mut_tx = ScopeGuard::::into_inner(mut_tx); - let (tx, tx_offset) = self.unsubscribe_views_and_downgrade_tx(mut_tx, &removed_queries, auth.caller())?; + let (mut tx, tx_offset) = self.unsubscribe_views_and_downgrade_tx(mut_tx, &removed_queries, auth.caller())?; let (update, metrics) = return_on_err!( self.evaluate_queries( @@ -844,6 +846,7 @@ impl ModuleSubscriptions { send_err_msg, None ); + tx.metrics.merge(metrics); // How many queries did we evaluate? subscription_metrics @@ -947,7 +950,7 @@ impl ModuleSubscriptions { }; let mut_tx = ScopeGuard::::into_inner(mut_tx); - let (tx, tx_offset) = self.unsubscribe_views_and_downgrade_tx(mut_tx, &removed_queries, auth.caller())?; + let (mut tx, tx_offset) = self.unsubscribe_views_and_downgrade_tx(mut_tx, &removed_queries, auth.caller())?; let (rows, metrics) = if request.flags == ws_v2::UnsubscribeFlags::SendDroppedRows { let (update, metrics) = return_on_err!( @@ -977,6 +980,9 @@ impl ModuleSubscriptions { } else { (None, None) }; + if let Some(metrics) = metrics { + tx.metrics.merge(metrics); + } let _ = self.broadcast_queue.send_client_message_v2( sender.clone(), @@ -1257,11 +1263,12 @@ impl ModuleSubscriptions { let mut_tx = ScopeGuard::::into_inner(mut_tx); - let (tx, tx_offset, trapped) = + let (mut tx, tx_offset, trapped) = self.materialize_views_and_downgrade_tx(mut_tx, instance, &queries, auth.caller())?; let (update, metrics) = self.evaluate_queries(sender.clone(), &queries, &tx, &auth, TableUpdateType::Subscribe)?; + tx.metrics.merge(metrics); subscription_metrics.num_queries_evaluated.inc_by(queries.len() as _); @@ -1362,7 +1369,7 @@ impl ModuleSubscriptions { let mut_tx = ScopeGuard::::into_inner(mut_tx); - let (tx, tx_offset, trapped) = + let (mut tx, tx_offset, trapped) = self.materialize_views_and_downgrade_tx(mut_tx, instance, &queries, auth.caller())?; let Ok((update, metrics)) = @@ -1383,6 +1390,7 @@ impl ModuleSubscriptions { send_err_msg("Internal error evaluating queries".into()); return Ok((None, trapped)); }; + tx.metrics.merge(metrics); // How many queries did we actually evaluate? subscription_metrics.num_queries_evaluated.inc_by(queries.len() as _); @@ -1479,7 +1487,7 @@ impl ModuleSubscriptions { subscription_metrics, )?; - let (tx, tx_offset, trapped) = + let (mut tx, tx_offset, trapped) = self.materialize_views_and_downgrade_tx(mut_tx, instance, &queries, auth.caller())?; check_row_limit( @@ -1497,16 +1505,22 @@ impl ModuleSubscriptions { // Record how long it took to compile the subscription drop(compile_timer); - let tx = DeltaTx::from(&*tx); + let delta_tx = DeltaTx::from(&*tx); let (database_update, metrics, query_metrics) = match sender.config.protocol { - Protocol::Binary => execute_plans(&auth, &queries, &tx, TableUpdateType::Subscribe, &self.bsatn_rlb_pool) - .map(|(table_update, metrics, query_metrics)| { + Protocol::Binary => execute_plans( + &auth, + &queries, + &delta_tx, + TableUpdateType::Subscribe, + &self.bsatn_rlb_pool, + ) + .map(|(table_update, metrics, query_metrics)| { (ws_v1::FormatSwitch::Bsatn(table_update), metrics, query_metrics) })?, Protocol::Text => execute_plans( &auth, &queries, - &tx, + &delta_tx, TableUpdateType::Subscribe, &JsonRowListBuilderFakePool, ) @@ -1516,6 +1530,7 @@ impl ModuleSubscriptions { }; record_query_metrics(&self.relational_db.database_identity(), query_metrics); + tx.metrics.merge(metrics); // It acquires the subscription lock after `eval`, allowing `add_subscription` to run concurrently. // This also makes it possible for `broadcast_event` to get scheduled before the subsequent part here @@ -1730,7 +1745,7 @@ impl ModuleSubscriptions { sender: Identity, ) -> Result<(TxGuard, TransactionOffset), DBError> { Self::_unsubscribe_views(&mut tx, view_collector, sender)?; - let (tx_data, tx_metrics_mut, tx) = self.relational_db.commit_tx_downgrade(tx, Workload::Subscribe); + let (tx_data, tx_metrics_mut, tx) = self.relational_db.commit_tx_downgrade(tx, Workload::Unsubscribe); let opts = GuardTxOptions::from_mut(tx_data, tx_metrics_mut); Ok(self.guard_tx(tx, opts)) }