Merge execution metrics with tx metrics

This commit is contained in:
joshua-spacetime
2026-05-04 22:41:21 -07:00
parent b36a964d0a
commit 341fee3720
3 changed files with 62 additions and 78 deletions
+30 -41
View File
@@ -1583,22 +1583,13 @@ impl ModuleHost {
.call_view_command(label, cmd)
.await
.map_err(|e| DBError::Other(anyhow::anyhow!(e)))?;
Self::record_view_command_metrics_for_result(&self.info, self.relational_db(), metric, &result);
Self::record_view_command_round_trip(&self.info, metric);
result
}
}
}
pub(in crate::host) fn record_view_command_metrics_for_result(
info: &ModuleInfo,
db: &RelationalDB,
metric: ViewCommandMetric,
result: &ViewCommandResult,
) {
if let Ok(Some(metrics)) = result {
db.exec_counters_for(metric.workload).record(metrics);
}
pub(in crate::host) fn record_view_command_round_trip(info: &ModuleInfo, metric: ViewCommandMetric) {
match metric.workload {
WorkloadType::Subscribe => info
.metrics
@@ -2753,7 +2744,7 @@ impl ModuleHost {
async |_, _| unreachable!("one-off query JS path is handled before Self::call"),
)
.await?;
Self::record_one_off_query_metrics_for_result(&self.info, self.relational_db(), timer, &result);
Self::record_one_off_query_round_trip(&self.info, timer);
result.map(|_| ())
}
}
@@ -2843,7 +2834,7 @@ impl ModuleHost {
into_message: impl FnOnce(OneOffQueryResponseMessage<F>) -> SerializableMessage,
) -> OneOffQueryResult {
let (tx_offset_sender, tx_offset_receiver) = oneshot::channel();
let tx = scopeguard::guard(db.begin_tx(Workload::Sql), |tx| {
let mut tx = scopeguard::guard(db.begin_tx(Workload::Sql), |tx| {
let (tx_offset, tx_metrics, reducer) = db.release_tx(tx);
let _ = tx_offset_sender.send(tx_offset);
db.report_read_tx_metrics(reducer, tx_metrics);
@@ -2914,15 +2905,18 @@ impl ModuleHost {
let total_host_execution_duration = timer.elapsed().into();
let (message, metrics): (SerializableMessage, Option<ExecutionMetrics>) = match result {
Ok((rows, metrics)) => (
into_message(OneOffQueryResponseMessage {
message_id,
error: None,
results: vec![rows],
total_host_execution_duration,
}),
Some(metrics),
),
Ok((rows, metrics)) => {
tx.metrics.merge(metrics);
(
into_message(OneOffQueryResponseMessage {
message_id,
error: None,
results: vec![rows],
total_host_execution_duration,
}),
Some(metrics),
)
}
Err(err) => (
into_message(OneOffQueryResponseMessage {
message_id,
@@ -2986,7 +2980,7 @@ impl ModuleHost {
rlb_pool,
} = params;
let (tx_offset_sender, tx_offset_receiver) = oneshot::channel();
let tx = scopeguard::guard(db.begin_tx(Workload::Sql), |tx| {
let mut tx = scopeguard::guard(db.begin_tx(Workload::Sql), |tx| {
let (tx_offset, tx_metrics, reducer) = db.release_tx(tx);
let _ = tx_offset_sender.send(tx_offset);
db.report_read_tx_metrics(reducer, tx_metrics);
@@ -3048,15 +3042,18 @@ impl ModuleHost {
})();
let (message, metrics) = match result {
Ok((rows, metrics)) => (
ws_v2::OneOffQueryResult {
request_id,
result: Ok(ws_v2::QueryRows {
tables: vec![rows].into_boxed_slice(),
}),
},
Some(metrics),
),
Ok((rows, metrics)) => {
tx.metrics.merge(metrics);
(
ws_v2::OneOffQueryResult {
request_id,
result: Ok(ws_v2::QueryRows {
tables: vec![rows].into_boxed_slice(),
}),
},
Some(metrics),
)
}
Err(err) => (
ws_v2::OneOffQueryResult {
request_id,
@@ -3070,15 +3067,7 @@ impl ModuleHost {
Ok(metrics)
}
pub(in crate::host) fn record_one_off_query_metrics_for_result(
info: &ModuleInfo,
db: &RelationalDB,
timer: Instant,
result: &OneOffQueryResult,
) {
if let Ok(Some(metrics)) = result {
db.exec_counters_for(WorkloadType::Sql).record(metrics);
}
pub(in crate::host) fn record_one_off_query_round_trip(info: &ModuleInfo, timer: Instant) {
info.metrics
.request_round_trip_sql
.observe(timer.elapsed().as_secs_f64());
+5 -25
View File
@@ -1344,13 +1344,8 @@ fn handle_main_worker_request(
}),
JsMainWorkerRequest::CallViewDetached { cmd, metric, on_panic } => {
handle_detached_worker_request("call_view", on_panic, || {
let (res, trapped) = instance_common.handle_cmd(cmd, inst);
ModuleHost::record_view_command_metrics_for_result(
&module_common.info(),
module_common.replica_ctx().relational_db(),
metric,
&res,
);
let (_, trapped) = instance_common.handle_cmd(cmd, inst);
ModuleHost::record_view_command_round_trip(&module_common.info(), metric);
trapped
})
}
@@ -1365,12 +1360,7 @@ fn handle_main_worker_request(
if let Err(err) = &res {
log::warn!("detached one-off query failed: {err:#}");
}
ModuleHost::record_one_off_query_metrics_for_result(
&module_common.info(),
module_common.replica_ctx().relational_db(),
timer,
&res,
);
ModuleHost::record_one_off_query_round_trip(&module_common.info(), timer);
false
})
}
@@ -1381,12 +1371,7 @@ fn handle_main_worker_request(
if let Err(err) = &res {
log::warn!("detached one-off query failed: {err:#}");
}
ModuleHost::record_one_off_query_metrics_for_result(
&module_common.info(),
module_common.replica_ctx().relational_db(),
timer,
&res,
);
ModuleHost::record_one_off_query_round_trip(&module_common.info(), timer);
false
})
}
@@ -1397,12 +1382,7 @@ fn handle_main_worker_request(
if let Err(err) = &res {
log::warn!("detached one-off query failed: {err:#}");
}
ModuleHost::record_one_off_query_metrics_for_result(
&module_common.info(),
module_common.replica_ctx().relational_db(),
timer,
&res,
);
ModuleHost::record_one_off_query_round_trip(&module_common.info(), timer);
false
})
}
@@ -666,7 +666,7 @@ impl ModuleSubscriptions {
let mut_tx = ScopeGuard::<MutTxId, _>::into_inner(mut_tx);
let (tx, tx_offset, trapped) =
let (mut tx, tx_offset, trapped) =
self.materialize_views_and_downgrade_tx(mut_tx, instance, &query, auth.caller())?;
let (table_rows, metrics) = return_on_err_with_sql_bool!(
@@ -674,6 +674,7 @@ impl ModuleSubscriptions {
query.sql(),
send_err_msg
);
tx.metrics.merge(metrics);
// It acquires the subscription lock after `eval`, allowing `add_subscription` to run concurrently.
// This also makes it possible for `broadcast_event` to get scheduled before the subsequent part here
@@ -751,13 +752,14 @@ impl ModuleSubscriptions {
return Ok(None);
};
let (tx, tx_offset) = self.unsubscribe_views(query, auth.caller())?;
let (mut tx, tx_offset) = self.unsubscribe_views(query, auth.caller())?;
let (table_rows, metrics) = return_on_err_with_sql!(
self.evaluate_initial_subscription(sender.clone(), query.clone(), &tx, &auth, TableUpdateType::Unsubscribe),
query.sql(),
send_err_msg
);
tx.metrics.merge(metrics);
// Note: to make sure transaction updates are consistent, we need to put this in the broadcast
// queue while we are still holding a read-lock on the database.
@@ -831,7 +833,7 @@ impl ModuleSubscriptions {
};
let mut_tx = ScopeGuard::<MutTxId, _>::into_inner(mut_tx);
let (tx, tx_offset) = self.unsubscribe_views_and_downgrade_tx(mut_tx, &removed_queries, auth.caller())?;
let (mut tx, tx_offset) = self.unsubscribe_views_and_downgrade_tx(mut_tx, &removed_queries, auth.caller())?;
let (update, metrics) = return_on_err!(
self.evaluate_queries(
@@ -844,6 +846,7 @@ impl ModuleSubscriptions {
send_err_msg,
None
);
tx.metrics.merge(metrics);
// How many queries did we evaluate?
subscription_metrics
@@ -947,7 +950,7 @@ impl ModuleSubscriptions {
};
let mut_tx = ScopeGuard::<MutTxId, _>::into_inner(mut_tx);
let (tx, tx_offset) = self.unsubscribe_views_and_downgrade_tx(mut_tx, &removed_queries, auth.caller())?;
let (mut tx, tx_offset) = self.unsubscribe_views_and_downgrade_tx(mut_tx, &removed_queries, auth.caller())?;
let (rows, metrics) = if request.flags == ws_v2::UnsubscribeFlags::SendDroppedRows {
let (update, metrics) = return_on_err!(
@@ -977,6 +980,9 @@ impl ModuleSubscriptions {
} else {
(None, None)
};
if let Some(metrics) = metrics {
tx.metrics.merge(metrics);
}
let _ = self.broadcast_queue.send_client_message_v2(
sender.clone(),
@@ -1257,11 +1263,12 @@ impl ModuleSubscriptions {
let mut_tx = ScopeGuard::<MutTxId, _>::into_inner(mut_tx);
let (tx, tx_offset, trapped) =
let (mut tx, tx_offset, trapped) =
self.materialize_views_and_downgrade_tx(mut_tx, instance, &queries, auth.caller())?;
let (update, metrics) =
self.evaluate_queries(sender.clone(), &queries, &tx, &auth, TableUpdateType::Subscribe)?;
tx.metrics.merge(metrics);
subscription_metrics.num_queries_evaluated.inc_by(queries.len() as _);
@@ -1362,7 +1369,7 @@ impl ModuleSubscriptions {
let mut_tx = ScopeGuard::<MutTxId, _>::into_inner(mut_tx);
let (tx, tx_offset, trapped) =
let (mut tx, tx_offset, trapped) =
self.materialize_views_and_downgrade_tx(mut_tx, instance, &queries, auth.caller())?;
let Ok((update, metrics)) =
@@ -1383,6 +1390,7 @@ impl ModuleSubscriptions {
send_err_msg("Internal error evaluating queries".into());
return Ok((None, trapped));
};
tx.metrics.merge(metrics);
// How many queries did we actually evaluate?
subscription_metrics.num_queries_evaluated.inc_by(queries.len() as _);
@@ -1479,7 +1487,7 @@ impl ModuleSubscriptions {
subscription_metrics,
)?;
let (tx, tx_offset, trapped) =
let (mut tx, tx_offset, trapped) =
self.materialize_views_and_downgrade_tx(mut_tx, instance, &queries, auth.caller())?;
check_row_limit(
@@ -1497,16 +1505,22 @@ impl ModuleSubscriptions {
// Record how long it took to compile the subscription
drop(compile_timer);
let tx = DeltaTx::from(&*tx);
let delta_tx = DeltaTx::from(&*tx);
let (database_update, metrics, query_metrics) = match sender.config.protocol {
Protocol::Binary => execute_plans(&auth, &queries, &tx, TableUpdateType::Subscribe, &self.bsatn_rlb_pool)
.map(|(table_update, metrics, query_metrics)| {
Protocol::Binary => execute_plans(
&auth,
&queries,
&delta_tx,
TableUpdateType::Subscribe,
&self.bsatn_rlb_pool,
)
.map(|(table_update, metrics, query_metrics)| {
(ws_v1::FormatSwitch::Bsatn(table_update), metrics, query_metrics)
})?,
Protocol::Text => execute_plans(
&auth,
&queries,
&tx,
&delta_tx,
TableUpdateType::Subscribe,
&JsonRowListBuilderFakePool,
)
@@ -1516,6 +1530,7 @@ impl ModuleSubscriptions {
};
record_query_metrics(&self.relational_db.database_identity(), query_metrics);
tx.metrics.merge(metrics);
// It acquires the subscription lock after `eval`, allowing `add_subscription` to run concurrently.
// This also makes it possible for `broadcast_event` to get scheduled before the subsequent part here
@@ -1730,7 +1745,7 @@ impl ModuleSubscriptions {
sender: Identity,
) -> Result<(TxGuard<impl FnOnce(TxId) + '_>, TransactionOffset), DBError> {
Self::_unsubscribe_views(&mut tx, view_collector, sender)?;
let (tx_data, tx_metrics_mut, tx) = self.relational_db.commit_tx_downgrade(tx, Workload::Subscribe);
let (tx_data, tx_metrics_mut, tx) = self.relational_db.commit_tx_downgrade(tx, Workload::Unsubscribe);
let opts = GuardTxOptions::from_mut(tx_data, tx_metrics_mut);
Ok(self.guard_tx(tx, opts))
}