mirror of
https://github.com/clockworklabs/SpacetimeDB.git
synced 2026-05-06 07:26:43 -04:00
Refactor and extract call_reducer and update_database machinery for reuse by V8 (#3190)
# Description of Changes Refactors the machinery of call_reducer and update_database to a) have smaller pieces that make up the whole so that the code becomes clearer b) extract all the VM-independent stuff so that it can be reused by V8 modules. This is best reviewed commit by commit. # API and ABI breaking changes None. # Expected complexity level and risk 2, it's an important place, but this is doing just code motion. # Testing No semantic changes, just code motion.
This commit is contained in:
committed by
GitHub
parent
be1bd22518
commit
2a070360cf
@@ -22,7 +22,7 @@ pub(super) struct KeyCache {
|
||||
tag: Option<Global<v8::String>>,
|
||||
/// The `value` property for sum values in JS.
|
||||
value: Option<Global<v8::String>>,
|
||||
/// The `describe_module` property on the global proxy object.
|
||||
/// The `__describe_module__` property on the global proxy object.
|
||||
describe_module: Option<Global<v8::String>>,
|
||||
}
|
||||
|
||||
@@ -37,9 +37,9 @@ impl KeyCache {
|
||||
Self::get_or_create_key(scope, &mut self.value, "value")
|
||||
}
|
||||
|
||||
/// Returns the `describe_module` property name.
|
||||
/// Returns the `__describe_module__` property name.
|
||||
pub(super) fn describe_module<'scope>(&mut self, scope: &mut HandleScope<'scope>) -> Local<'scope, v8::String> {
|
||||
Self::get_or_create_key(scope, &mut self.describe_module, "describe_module")
|
||||
Self::get_or_create_key(scope, &mut self.describe_module, "__describe_module__")
|
||||
}
|
||||
|
||||
/// Returns an interned string corresponding to `string`
|
||||
|
||||
@@ -134,9 +134,9 @@ impl ModuleInstance for JsInstance {
|
||||
}
|
||||
}
|
||||
|
||||
// Calls the `describe_module` function on the global proxy object to extract a [`RawModuleDef`].
|
||||
// Calls the `__describe_module__` function on the global proxy object to extract a [`RawModuleDef`].
|
||||
fn call_describe_module(scope: &mut HandleScope<'_>) -> anyhow::Result<RawModuleDef> {
|
||||
// Get a cached version of the `describe_module` property.
|
||||
// Get a cached version of the `__describe_module__` property.
|
||||
let key_cache = get_or_create_key_cache(scope);
|
||||
let describe_module_key = key_cache.borrow_mut().describe_module(scope).into();
|
||||
|
||||
@@ -150,7 +150,7 @@ fn call_describe_module(scope: &mut HandleScope<'_>) -> anyhow::Result<RawModule
|
||||
|
||||
// Convert to a function.
|
||||
let fun =
|
||||
cast!(scope, object, Function, "function export for `describe_module`").map_err(|e| e.throw(scope))?;
|
||||
cast!(scope, object, Function, "function export for `__describe_module__`").map_err(|e| e.throw(scope))?;
|
||||
|
||||
// Call the function.
|
||||
let receiver = v8::undefined(scope).into();
|
||||
@@ -183,7 +183,7 @@ mod test {
|
||||
#[test]
|
||||
fn call_describe_module_works() {
|
||||
let code = r#"
|
||||
function describe_module() {
|
||||
function __describe_module__() {
|
||||
return {
|
||||
"tag": "V9",
|
||||
"value": {
|
||||
|
||||
@@ -1,12 +1,14 @@
|
||||
use bytes::Bytes;
|
||||
use prometheus::IntGauge;
|
||||
use prometheus::{Histogram, IntCounter, IntGauge};
|
||||
use spacetimedb_lib::db::raw_def::v9::Lifecycle;
|
||||
use spacetimedb_schema::auto_migrate::ponder_migrate;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
use tracing::span::EnteredSpan;
|
||||
|
||||
use super::instrumentation::CallTimes;
|
||||
use crate::database_logger::{self, SystemLogger};
|
||||
use crate::client::ClientConnectionSender;
|
||||
use crate::database_logger;
|
||||
use crate::energy::{EnergyMonitor, EnergyQuanta, ReducerBudget, ReducerFingerprint};
|
||||
use crate::host::instance_env::InstanceEnv;
|
||||
use crate::host::module_common::{build_common_module_from_raw, ModuleCommon};
|
||||
@@ -20,7 +22,7 @@ use crate::messages::control_db::HostType;
|
||||
use crate::module_host_context::ModuleCreationContext;
|
||||
use crate::replica_context::ReplicaContext;
|
||||
use crate::subscription::module_subscription_actor::WriteConflict;
|
||||
use crate::util::prometheus_handle::HistogramExt;
|
||||
use crate::util::prometheus_handle::{HistogramExt, TimerGuard};
|
||||
use crate::worker_metrics::WORKER_METRICS;
|
||||
use spacetimedb_datastore::db_metrics::DB_METRICS;
|
||||
use spacetimedb_datastore::execution_context::{self, ReducerContext, Workload};
|
||||
@@ -53,11 +55,9 @@ pub trait WasmInstance: Send + Sync + 'static {
|
||||
|
||||
fn instance_env(&self) -> &InstanceEnv;
|
||||
|
||||
type Trap: Send;
|
||||
fn call_reducer(&mut self, op: ReducerOp<'_>, budget: ReducerBudget) -> ExecuteResult;
|
||||
|
||||
fn call_reducer(&mut self, op: ReducerOp<'_>, budget: ReducerBudget) -> ExecuteResult<Self::Trap>;
|
||||
|
||||
fn log_traceback(func_type: &str, func: &str, trap: &Self::Trap);
|
||||
fn log_traceback(func_type: &str, func: &str, trap: &anyhow::Error);
|
||||
}
|
||||
|
||||
pub struct EnergyStats {
|
||||
@@ -71,11 +71,11 @@ pub struct ExecutionTimings {
|
||||
pub wasm_instance_env_call_times: CallTimes,
|
||||
}
|
||||
|
||||
pub struct ExecuteResult<E> {
|
||||
pub struct ExecuteResult {
|
||||
pub energy: EnergyStats,
|
||||
pub timings: ExecutionTimings,
|
||||
pub memory_allocation: usize,
|
||||
pub call_result: Result<Result<(), Box<str>>, E>,
|
||||
pub call_result: Result<Result<(), Box<str>>, anyhow::Error>,
|
||||
}
|
||||
|
||||
pub(crate) struct WasmModuleHostActor<T: WasmModule> {
|
||||
@@ -161,8 +161,7 @@ impl<T: WasmModule> WasmModuleHostActor<T> {
|
||||
|
||||
impl<T: WasmModule> WasmModuleHostActor<T> {
|
||||
fn make_from_instance(&self, instance: T::Instance) -> WasmModuleInstance<T::Instance> {
|
||||
WasmModuleInstance {
|
||||
instance,
|
||||
let common = InstanceCommon {
|
||||
info: self.common.info(),
|
||||
energy_monitor: self.common.energy_monitor(),
|
||||
// will be updated on the first reducer call
|
||||
@@ -171,7 +170,8 @@ impl<T: WasmModule> WasmModuleHostActor<T> {
|
||||
.wasm_memory_bytes
|
||||
.with_label_values(self.common.database_identity()),
|
||||
trapped: false,
|
||||
}
|
||||
};
|
||||
WasmModuleInstance { instance, common }
|
||||
}
|
||||
}
|
||||
|
||||
@@ -214,72 +214,29 @@ impl<T: WasmModule> Module for WasmModuleHostActor<T> {
|
||||
|
||||
pub struct WasmModuleInstance<T: WasmInstance> {
|
||||
instance: T,
|
||||
info: Arc<ModuleInfo>,
|
||||
energy_monitor: Arc<dyn EnergyMonitor>,
|
||||
allocated_memory: usize,
|
||||
metric_wasm_memory_bytes: IntGauge,
|
||||
trapped: bool,
|
||||
common: InstanceCommon,
|
||||
}
|
||||
|
||||
impl<T: WasmInstance> std::fmt::Debug for WasmModuleInstance<T> {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
f.debug_struct("WasmInstanceActor")
|
||||
.field("trapped", &self.trapped)
|
||||
.field("trapped", &self.common.trapped)
|
||||
.finish()
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: WasmInstance> WasmModuleInstance<T> {
|
||||
fn replica_context(&self) -> &ReplicaContext {
|
||||
&self.instance.instance_env().replica_ctx
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: WasmInstance> ModuleInstance for WasmModuleInstance<T> {
|
||||
fn trapped(&self) -> bool {
|
||||
self.trapped
|
||||
self.common.trapped
|
||||
}
|
||||
|
||||
#[tracing::instrument(level = "trace", skip_all)]
|
||||
fn update_database(
|
||||
&mut self,
|
||||
program: Program,
|
||||
old_module_info: Arc<ModuleInfo>,
|
||||
) -> Result<UpdateDatabaseResult, anyhow::Error> {
|
||||
let plan = ponder_migrate(&old_module_info.module_def, &self.info.module_def);
|
||||
let plan = match plan {
|
||||
Ok(plan) => plan,
|
||||
Err(errs) => {
|
||||
return Ok(UpdateDatabaseResult::AutoMigrateError(errs));
|
||||
}
|
||||
};
|
||||
let stdb = &*self.replica_context().relational_db;
|
||||
|
||||
let program_hash = program.hash;
|
||||
let tx = stdb.begin_mut_tx(IsolationLevel::Serializable, Workload::Internal);
|
||||
let (mut tx, _) = stdb.with_auto_rollback(tx, |tx| stdb.update_program(tx, HostType::Wasm, program))?;
|
||||
self.system_logger().info(&format!("Updated program to {program_hash}"));
|
||||
|
||||
let auth_ctx = AuthCtx::for_current(self.replica_context().database.owner_identity);
|
||||
let res = crate::db::update::update_database(stdb, &mut tx, auth_ctx, plan, self.system_logger());
|
||||
|
||||
match res {
|
||||
Err(e) => {
|
||||
log::warn!("Database update failed: {} @ {}", e, stdb.database_identity());
|
||||
self.system_logger().warn(&format!("Database update failed: {e}"));
|
||||
let (tx_metrics, reducer) = stdb.rollback_mut_tx(tx);
|
||||
stdb.report_mut_tx_metrics(reducer, tx_metrics, None);
|
||||
Ok(UpdateDatabaseResult::ErrorExecutingMigration(e))
|
||||
}
|
||||
Ok(()) => {
|
||||
if let Some((tx_data, tx_metrics, reducer)) = stdb.commit_tx(tx)? {
|
||||
stdb.report_mut_tx_metrics(reducer, tx_metrics, Some(tx_data));
|
||||
}
|
||||
self.system_logger().info("Database updated");
|
||||
log::info!("Database updated, {}", stdb.database_identity());
|
||||
Ok(UpdateDatabaseResult::UpdatePerformed)
|
||||
}
|
||||
}
|
||||
let replica_ctx = &self.instance.instance_env().replica_ctx;
|
||||
self.common.update_database(replica_ctx, program, old_module_info)
|
||||
}
|
||||
|
||||
fn call_reducer(&mut self, tx: Option<MutTxId>, params: CallReducerParams) -> ReducerCallResult {
|
||||
@@ -288,6 +245,78 @@ impl<T: WasmInstance> ModuleInstance for WasmModuleInstance<T> {
|
||||
}
|
||||
|
||||
impl<T: WasmInstance> WasmModuleInstance<T> {
|
||||
#[tracing::instrument(level = "trace", skip_all)]
|
||||
fn call_reducer_with_tx(&mut self, tx: Option<MutTxId>, params: CallReducerParams) -> ReducerCallResult {
|
||||
self.common.call_reducer_with_tx(
|
||||
&self.instance.instance_env().replica_ctx.clone(),
|
||||
tx,
|
||||
params,
|
||||
|ty, fun, err| T::log_traceback(ty, fun, err),
|
||||
|tx, op, budget| {
|
||||
self.instance
|
||||
.instance_env()
|
||||
.tx
|
||||
.clone()
|
||||
.set(tx, || self.instance.call_reducer(op, budget))
|
||||
},
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
struct InstanceCommon {
|
||||
info: Arc<ModuleInfo>,
|
||||
energy_monitor: Arc<dyn EnergyMonitor>,
|
||||
allocated_memory: usize,
|
||||
metric_wasm_memory_bytes: IntGauge,
|
||||
trapped: bool,
|
||||
}
|
||||
|
||||
impl InstanceCommon {
|
||||
#[tracing::instrument(level = "trace", skip_all)]
|
||||
fn update_database(
|
||||
&mut self,
|
||||
replica_ctx: &ReplicaContext,
|
||||
program: Program,
|
||||
old_module_info: Arc<ModuleInfo>,
|
||||
) -> Result<UpdateDatabaseResult, anyhow::Error> {
|
||||
let system_logger = replica_ctx.logger.system_logger();
|
||||
let stdb = &replica_ctx.relational_db;
|
||||
|
||||
let plan = ponder_migrate(&old_module_info.module_def, &self.info.module_def);
|
||||
let plan = match plan {
|
||||
Ok(plan) => plan,
|
||||
Err(errs) => {
|
||||
return Ok(UpdateDatabaseResult::AutoMigrateError(errs));
|
||||
}
|
||||
};
|
||||
|
||||
let program_hash = program.hash;
|
||||
let tx = stdb.begin_mut_tx(IsolationLevel::Serializable, Workload::Internal);
|
||||
let (mut tx, _) = stdb.with_auto_rollback(tx, |tx| stdb.update_program(tx, HostType::Wasm, program))?;
|
||||
system_logger.info(&format!("Updated program to {program_hash}"));
|
||||
|
||||
let auth_ctx = AuthCtx::for_current(replica_ctx.database.owner_identity);
|
||||
let res = crate::db::update::update_database(stdb, &mut tx, auth_ctx, plan, system_logger);
|
||||
|
||||
match res {
|
||||
Err(e) => {
|
||||
log::warn!("Database update failed: {} @ {}", e, stdb.database_identity());
|
||||
system_logger.warn(&format!("Database update failed: {e}"));
|
||||
let (tx_metrics, reducer) = stdb.rollback_mut_tx(tx);
|
||||
stdb.report_mut_tx_metrics(reducer, tx_metrics, None);
|
||||
Ok(UpdateDatabaseResult::ErrorExecutingMigration(e))
|
||||
}
|
||||
Ok(()) => {
|
||||
if let Some((tx_data, tx_metrics, reducer)) = stdb.commit_tx(tx)? {
|
||||
stdb.report_mut_tx_metrics(reducer, tx_metrics, Some(tx_data));
|
||||
}
|
||||
system_logger.info("Database updated");
|
||||
log::info!("Database updated, {}", stdb.database_identity());
|
||||
Ok(UpdateDatabaseResult::UpdatePerformed)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Execute a reducer.
|
||||
///
|
||||
/// If `Some` [`MutTxId`] is supplied, the reducer is called within the
|
||||
@@ -304,8 +333,14 @@ impl<T: WasmInstance> WasmModuleInstance<T> {
|
||||
/// The method also performs various measurements and records energy usage,
|
||||
/// as well as broadcasting a [`ModuleEvent`] containing information about
|
||||
/// the outcome of the call.
|
||||
#[tracing::instrument(level = "trace", skip_all)]
|
||||
fn call_reducer_with_tx(&mut self, tx: Option<MutTxId>, params: CallReducerParams) -> ReducerCallResult {
|
||||
fn call_reducer_with_tx(
|
||||
&mut self,
|
||||
replica_ctx: &ReplicaContext,
|
||||
tx: Option<MutTxId>,
|
||||
params: CallReducerParams,
|
||||
log_traceback: impl FnOnce(&str, &str, &anyhow::Error),
|
||||
vm_call_reducer: impl FnOnce(MutTxId, ReducerOp<'_>, ReducerBudget) -> (MutTxId, ExecuteResult),
|
||||
) -> ReducerCallResult {
|
||||
let CallReducerParams {
|
||||
timestamp,
|
||||
caller_identity,
|
||||
@@ -318,18 +353,17 @@ impl<T: WasmInstance> WasmModuleInstance<T> {
|
||||
} = params;
|
||||
let caller_connection_id_opt = (caller_connection_id != ConnectionId::ZERO).then_some(caller_connection_id);
|
||||
|
||||
let replica_ctx = self.replica_context();
|
||||
let stdb = &*replica_ctx.relational_db.clone();
|
||||
let database_identity = replica_ctx.database_identity;
|
||||
let reducer_def = self.info.module_def.reducer_by_id(reducer_id);
|
||||
let reducer_name = &*reducer_def.name;
|
||||
let reducer = reducer_name.to_string();
|
||||
|
||||
let _outer_span = tracing::trace_span!("call_reducer",
|
||||
reducer_name,
|
||||
%caller_identity,
|
||||
caller_connection_id = caller_connection_id_opt.map(tracing::field::debug),
|
||||
)
|
||||
.entered();
|
||||
// Do some `with_label_values`.
|
||||
// TODO(perf, centril): consider caching this.
|
||||
let vm_metrics = VmMetrics::new(&database_identity, reducer_name);
|
||||
|
||||
let _outer_span = start_call_reducer_span(reducer_name, &caller_identity, caller_connection_id_opt);
|
||||
|
||||
let energy_fingerprint = ReducerFingerprint {
|
||||
module_hash: self.info.module_hash,
|
||||
@@ -348,35 +382,13 @@ impl<T: WasmInstance> WasmModuleInstance<T> {
|
||||
arg_bytes: args.get_bsatn().clone(),
|
||||
};
|
||||
|
||||
// Before we take the lock, do some `with_label_values`.
|
||||
let metric_reducer_plus_query_duration = WORKER_METRICS
|
||||
.reducer_plus_query_duration
|
||||
.with_label_values(&database_identity, op.name);
|
||||
let metric_reducer_wasmtime_fuel_used = DB_METRICS
|
||||
.reducer_wasmtime_fuel_used
|
||||
.with_label_values(&database_identity, reducer_name);
|
||||
let metric_reducer_duration_usec = DB_METRICS
|
||||
.reducer_duration_usec
|
||||
.with_label_values(&database_identity, reducer_name);
|
||||
let metric_reducer_abi_time_usec = DB_METRICS
|
||||
.reducer_abi_time_usec
|
||||
.with_label_values(&database_identity, reducer_name);
|
||||
|
||||
let workload = Workload::Reducer(ReducerContext::from(op.clone()));
|
||||
let tx = tx.unwrap_or_else(|| stdb.begin_mut_tx(IsolationLevel::Serializable, workload));
|
||||
let _guard = metric_reducer_plus_query_duration.with_timer(tx.timer);
|
||||
let _guard = vm_metrics.timer_guard_for_reducer_plus_query(tx.timer);
|
||||
|
||||
let mut tx_slot = self.instance.instance_env().tx.clone();
|
||||
let reducer_span = start_run_reducer_span(budget);
|
||||
|
||||
let reducer_span = tracing::trace_span!(
|
||||
"run_reducer",
|
||||
timings.total_duration = tracing::field::Empty,
|
||||
energy.budget = budget.get(),
|
||||
energy.used = tracing::field::Empty,
|
||||
)
|
||||
.entered();
|
||||
|
||||
let (mut tx, result) = tx_slot.set(tx, || self.instance.call_reducer(op, budget));
|
||||
let (mut tx, result) = vm_call_reducer(tx, op, budget);
|
||||
|
||||
let ExecuteResult {
|
||||
energy,
|
||||
@@ -385,9 +397,11 @@ impl<T: WasmInstance> WasmModuleInstance<T> {
|
||||
call_result,
|
||||
} = result;
|
||||
|
||||
metric_reducer_wasmtime_fuel_used.inc_by(energy.wasmtime_fuel_used);
|
||||
metric_reducer_duration_usec.inc_by(timings.total_duration.as_micros() as u64);
|
||||
metric_reducer_abi_time_usec.inc_by(timings.wasm_instance_env_call_times.sum().as_micros() as u64);
|
||||
vm_metrics.report(
|
||||
energy.wasmtime_fuel_used,
|
||||
timings.total_duration,
|
||||
&timings.wasm_instance_env_call_times,
|
||||
);
|
||||
|
||||
self.energy_monitor
|
||||
.record_reducer(&energy_fingerprint, energy.used, timings.total_duration);
|
||||
@@ -400,20 +414,12 @@ impl<T: WasmInstance> WasmModuleInstance<T> {
|
||||
.record("timings.total_duration", tracing::field::debug(timings.total_duration))
|
||||
.record("energy.used", tracing::field::debug(energy.used));
|
||||
|
||||
const FRAME_LEN_60FPS: Duration = Duration::from_secs(1).checked_div(60).unwrap();
|
||||
if timings.total_duration > FRAME_LEN_60FPS {
|
||||
// If we can't get your reducer done in a single frame we should debug it.
|
||||
tracing::debug!(
|
||||
message = "Long running reducer finished executing",
|
||||
reducer_name,
|
||||
?timings.total_duration,
|
||||
);
|
||||
}
|
||||
maybe_log_long_running_reducer(reducer_name, timings.total_duration);
|
||||
reducer_span.exit();
|
||||
|
||||
let status = match call_result {
|
||||
Err(err) => {
|
||||
T::log_traceback("reducer", reducer_name, &err);
|
||||
log_traceback("reducer", reducer_name, &err);
|
||||
|
||||
WORKER_METRICS
|
||||
.wasm_instance_errors
|
||||
@@ -434,40 +440,24 @@ impl<T: WasmInstance> WasmModuleInstance<T> {
|
||||
EventStatus::Failed("The Wasm instance encountered a fatal error.".into())
|
||||
}
|
||||
}
|
||||
Ok(Err(errmsg)) => {
|
||||
log::info!("reducer returned error: {errmsg}");
|
||||
|
||||
self.replica_context().logger.write(
|
||||
database_logger::LogLevel::Error,
|
||||
&database_logger::Record {
|
||||
ts: chrono::DateTime::from_timestamp_micros(timestamp.to_micros_since_unix_epoch()).unwrap(),
|
||||
target: Some(reducer_name),
|
||||
filename: None,
|
||||
line_number: None,
|
||||
message: &errmsg,
|
||||
},
|
||||
&(),
|
||||
);
|
||||
EventStatus::Failed(errmsg.into())
|
||||
}
|
||||
// We haven't actually committed yet - `commit_and_broadcast_event` will commit
|
||||
// for us and replace this with the actual database update.
|
||||
//
|
||||
// Detecting a new client, and inserting it in `st_clients`
|
||||
// and conversely removing from `st_clients` on disconnect.
|
||||
Ok(Ok(())) => {
|
||||
let res = match reducer_def.lifecycle {
|
||||
Some(Lifecycle::OnConnect) => tx.insert_st_client(caller_identity, caller_connection_id),
|
||||
Some(Lifecycle::OnDisconnect) => {
|
||||
tx.delete_st_client(caller_identity, caller_connection_id, database_identity)
|
||||
}
|
||||
_ => Ok(()),
|
||||
};
|
||||
match res {
|
||||
Ok(()) => EventStatus::Committed(DatabaseUpdate::default()),
|
||||
Err(err) => EventStatus::Failed(err.to_string()),
|
||||
Ok(res) => match res.and_then(|()| {
|
||||
lifecyle_modifications_to_tx(
|
||||
reducer_def.lifecycle,
|
||||
caller_identity,
|
||||
caller_connection_id,
|
||||
database_identity,
|
||||
&mut tx,
|
||||
)
|
||||
}) {
|
||||
Ok(()) => EventStatus::Committed(DatabaseUpdate::default()),
|
||||
Err(err) => {
|
||||
log::info!("reducer returned error: {err}");
|
||||
log_reducer_error(replica_ctx, timestamp, reducer_name, &err);
|
||||
EventStatus::Failed(err.into())
|
||||
}
|
||||
}
|
||||
},
|
||||
};
|
||||
|
||||
let event = ModuleEvent {
|
||||
@@ -475,7 +465,7 @@ impl<T: WasmInstance> WasmModuleInstance<T> {
|
||||
caller_identity,
|
||||
caller_connection_id: caller_connection_id_opt,
|
||||
function_call: ModuleFunctionCall {
|
||||
reducer: reducer_name.to_owned(),
|
||||
reducer,
|
||||
reducer_id,
|
||||
args,
|
||||
},
|
||||
@@ -485,15 +475,7 @@ impl<T: WasmInstance> WasmModuleInstance<T> {
|
||||
request_id,
|
||||
timer,
|
||||
};
|
||||
let (event, _) = match self
|
||||
.info
|
||||
.subscriptions
|
||||
.commit_and_broadcast_event(client, event, tx)
|
||||
.unwrap()
|
||||
{
|
||||
Ok(ev) => ev,
|
||||
Err(WriteConflict) => todo!("Write skew, you need to implement retries my man, T-dawg."),
|
||||
};
|
||||
let event = commit_and_broadcast_event(&self.info, client, event, tx);
|
||||
|
||||
ReducerCallResult {
|
||||
outcome: ReducerOutcome::from(&event.status),
|
||||
@@ -501,10 +483,138 @@ impl<T: WasmInstance> WasmModuleInstance<T> {
|
||||
execution_duration: timings.total_duration,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Helpers - NOT API
|
||||
fn system_logger(&self) -> &SystemLogger {
|
||||
self.replica_context().logger.system_logger()
|
||||
/// VM-related metrics for reducer execution.
|
||||
struct VmMetrics {
|
||||
/// The time spent executing a reducer + plus evaluating its subscription queries.
|
||||
reducer_plus_query_duration: Histogram,
|
||||
/// The total VM fuel used.
|
||||
reducer_fuel_used: IntCounter,
|
||||
/// The total runtime of reducer calls.
|
||||
reducer_duration_usec: IntCounter,
|
||||
/// The total time spent in reducer ABI calls.
|
||||
reducer_abi_time_usec: IntCounter,
|
||||
}
|
||||
|
||||
impl VmMetrics {
|
||||
/// Returns new metrics counters for `database_identity` and `reducer_name`.
|
||||
fn new(database_identity: &Identity, reducer_name: &str) -> Self {
|
||||
let reducer_plus_query_duration = WORKER_METRICS
|
||||
.reducer_plus_query_duration
|
||||
.with_label_values(database_identity, reducer_name);
|
||||
let reducer_fuel_used = DB_METRICS
|
||||
.reducer_wasmtime_fuel_used
|
||||
.with_label_values(database_identity, reducer_name);
|
||||
let reducer_duration_usec = DB_METRICS
|
||||
.reducer_duration_usec
|
||||
.with_label_values(database_identity, reducer_name);
|
||||
let reducer_abi_time_usec = DB_METRICS
|
||||
.reducer_abi_time_usec
|
||||
.with_label_values(database_identity, reducer_name);
|
||||
|
||||
Self {
|
||||
reducer_plus_query_duration,
|
||||
reducer_fuel_used,
|
||||
reducer_duration_usec,
|
||||
reducer_abi_time_usec,
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns a timer guard for `reducer_plus_query_duration`.
|
||||
fn timer_guard_for_reducer_plus_query(&self, start: Instant) -> TimerGuard {
|
||||
self.reducer_plus_query_duration.clone().with_timer(start)
|
||||
}
|
||||
|
||||
/// Reports some VM metrics.
|
||||
fn report(&self, fuel_used: u64, reducer_duration: Duration, abi_time: &CallTimes) {
|
||||
self.reducer_fuel_used.inc_by(fuel_used);
|
||||
self.reducer_duration_usec.inc_by(reducer_duration.as_micros() as u64);
|
||||
self.reducer_abi_time_usec.inc_by(abi_time.sum().as_micros() as u64);
|
||||
}
|
||||
}
|
||||
|
||||
/// Starts the `call_reducer` span.
|
||||
fn start_call_reducer_span(
|
||||
reducer_name: &str,
|
||||
caller_identity: &Identity,
|
||||
caller_connection_id_opt: Option<ConnectionId>,
|
||||
) -> EnteredSpan {
|
||||
tracing::trace_span!("call_reducer",
|
||||
reducer_name,
|
||||
%caller_identity,
|
||||
caller_connection_id = caller_connection_id_opt.map(tracing::field::debug),
|
||||
)
|
||||
.entered()
|
||||
}
|
||||
|
||||
/// Starts the `run_reducer` span.
|
||||
fn start_run_reducer_span(budget: ReducerBudget) -> EnteredSpan {
|
||||
tracing::trace_span!(
|
||||
"run_reducer",
|
||||
timings.total_duration = tracing::field::Empty,
|
||||
energy.budget = budget.get(),
|
||||
energy.used = tracing::field::Empty,
|
||||
)
|
||||
.entered()
|
||||
}
|
||||
|
||||
/// Logs a tracing message if a reducer doesn't finish in a single frame at 60 FPS.
|
||||
fn maybe_log_long_running_reducer(reducer_name: &str, total_duration: Duration) {
|
||||
const FRAME_LEN_60FPS: Duration = Duration::from_secs(1).checked_div(60).unwrap();
|
||||
if total_duration > FRAME_LEN_60FPS {
|
||||
tracing::debug!(
|
||||
message = "Long running reducer finished executing",
|
||||
reducer_name,
|
||||
?total_duration,
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
/// Logs an error `message` for `reducer` at `timestamp` into `replica_ctx`.
|
||||
fn log_reducer_error(replica_ctx: &ReplicaContext, timestamp: Timestamp, reducer: &str, message: &str) {
|
||||
let record = database_logger::Record {
|
||||
ts: chrono::DateTime::from_timestamp_micros(timestamp.to_micros_since_unix_epoch()).unwrap(),
|
||||
target: Some(reducer),
|
||||
filename: None,
|
||||
line_number: None,
|
||||
message,
|
||||
};
|
||||
replica_ctx.logger.write(database_logger::LogLevel::Error, &record, &());
|
||||
}
|
||||
|
||||
/// Detects lifecycle events for connecting/disconnecting a new client
|
||||
/// and inserts/removes into `st_clients` depending on which.
|
||||
fn lifecyle_modifications_to_tx(
|
||||
lifecycle: Option<Lifecycle>,
|
||||
caller_id: Identity,
|
||||
caller_conn_id: ConnectionId,
|
||||
db_id: Identity,
|
||||
tx: &mut MutTxId,
|
||||
) -> Result<(), Box<str>> {
|
||||
match lifecycle {
|
||||
Some(Lifecycle::OnConnect) => tx.insert_st_client(caller_id, caller_conn_id),
|
||||
Some(Lifecycle::OnDisconnect) => tx.delete_st_client(caller_id, caller_conn_id, db_id),
|
||||
_ => Ok(()),
|
||||
}
|
||||
.map_err(|e| e.to_string().into())
|
||||
}
|
||||
|
||||
/// Commits the transaction
|
||||
/// and evaluates and broadcasts subscriptions updates.
|
||||
fn commit_and_broadcast_event(
|
||||
info: &ModuleInfo,
|
||||
client: Option<Arc<ClientConnectionSender>>,
|
||||
event: ModuleEvent,
|
||||
tx: MutTxId,
|
||||
) -> Arc<ModuleEvent> {
|
||||
match info
|
||||
.subscriptions
|
||||
.commit_and_broadcast_event(client, event, tx)
|
||||
.unwrap()
|
||||
{
|
||||
Ok((event, _)) => event,
|
||||
Err(WriteConflict) => todo!("Write skew, you need to implement retries my man, T-dawg."),
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -189,14 +189,8 @@ impl module_host_actor::WasmInstance for WasmtimeInstance {
|
||||
self.store.data().instance_env()
|
||||
}
|
||||
|
||||
type Trap = anyhow::Error;
|
||||
|
||||
#[tracing::instrument(level = "trace", skip_all)]
|
||||
fn call_reducer(
|
||||
&mut self,
|
||||
op: ReducerOp<'_>,
|
||||
budget: ReducerBudget,
|
||||
) -> module_host_actor::ExecuteResult<Self::Trap> {
|
||||
fn call_reducer(&mut self, op: ReducerOp<'_>, budget: ReducerBudget) -> module_host_actor::ExecuteResult {
|
||||
let store = &mut self.store;
|
||||
// note that ReducerBudget being a u64 is load-bearing here - although we convert budget right back into
|
||||
// EnergyQuanta at the end of this function, from_energy_quanta clamps it to a u64 range.
|
||||
@@ -253,7 +247,7 @@ impl module_host_actor::WasmInstance for WasmtimeInstance {
|
||||
}
|
||||
}
|
||||
|
||||
fn log_traceback(func_type: &str, func: &str, trap: &Self::Trap) {
|
||||
fn log_traceback(func_type: &str, func: &str, trap: &anyhow::Error) {
|
||||
log_traceback(func_type, func, trap)
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user