From e07b7a4cff36b70d77bdc0bf860ea13339d2cbda Mon Sep 17 00:00:00 2001 From: joshua-spacetime Date: Tue, 5 May 2026 21:46:16 -0700 Subject: [PATCH] Make procedure instance pool configurable --- crates/core/src/config.rs | 115 ++++++++++++++++++--- crates/core/src/host/host_controller.rs | 12 +-- crates/core/src/host/module_host.rs | 8 +- crates/core/src/host/v8/mod.rs | 24 +++-- crates/standalone/config.toml | 5 + crates/standalone/src/lib.rs | 8 +- crates/standalone/src/subcommands/start.rs | 16 +-- crates/testing/src/modules.rs | 2 +- 8 files changed, 145 insertions(+), 45 deletions(-) diff --git a/crates/core/src/config.rs b/crates/core/src/config.rs index 5fe9ac2de..4a1406ebf 100644 --- a/crates/core/src/config.rs +++ b/crates/core/src/config.rs @@ -1,3 +1,4 @@ +use std::num::NonZeroUsize; use std::path::Path; use std::time::Duration; use std::{fmt, io}; @@ -132,15 +133,41 @@ impl fmt::Display for MetadataFile { } } +#[derive(Default)] +pub struct ConfigFile { + pub certificate_authority: Option, + pub logs: LogConfig, + pub v8: V8Config, +} + #[derive(serde::Deserialize, Default)] #[serde(rename_all = "kebab-case")] -pub struct ConfigFile { +struct ConfigFileToml { #[serde(default)] - pub certificate_authority: Option, + certificate_authority: Option, #[serde(default)] - pub logs: LogConfig, + logs: LogConfig, #[serde(default)] - pub v8_heap_policy: V8HeapPolicyConfig, + v8: V8ConfigToml, + #[serde(default)] + v8_heap_policy: V8HeapPolicyConfig, +} + +impl<'de> serde::Deserialize<'de> for ConfigFile { + fn deserialize(deserializer: D) -> Result + where + D: serde::Deserializer<'de>, + { + let config = ConfigFileToml::deserialize(deserializer)?; + Ok(Self { + certificate_authority: config.certificate_authority, + logs: config.logs, + v8: V8Config { + procedure_instance_pool_size: config.v8.procedure_instance_pool_size, + heap_policy: config.v8_heap_policy, + }, + }) + } } impl ConfigFile { @@ -179,6 +206,46 @@ pub struct LogConfig { pub directives: Vec, } +#[derive(Clone, Copy, Debug)] +pub struct V8Config { + pub procedure_instance_pool_size: NonZeroUsize, + pub heap_policy: V8HeapPolicyConfig, +} + +impl Default for V8Config { + fn default() -> Self { + Self { + procedure_instance_pool_size: default_v8_procedure_instance_pool_size(), + heap_policy: V8HeapPolicyConfig::default(), + } + } +} + +impl V8Config { + pub fn normalized(mut self) -> Self { + self.heap_policy = self.heap_policy.normalized(); + self + } +} + +#[derive(Clone, Copy, Debug, serde::Deserialize)] +#[serde(rename_all = "kebab-case")] +struct V8ConfigToml { + #[serde( + default = "default_v8_procedure_instance_pool_size", + deserialize_with = "de_nz_usize" + )] + pub procedure_instance_pool_size: NonZeroUsize, +} + +impl Default for V8ConfigToml { + fn default() -> Self { + Self { + procedure_instance_pool_size: default_v8_procedure_instance_pool_size(), + } + } +} + #[derive(Clone, Copy, Debug, serde::Deserialize)] #[serde(rename_all = "kebab-case")] pub struct V8HeapPolicyConfig { @@ -252,6 +319,18 @@ fn def_heap_limit() -> usize { 1024 * 1024 * 1024 } +fn default_v8_procedure_instance_pool_size() -> NonZeroUsize { + std::thread::available_parallelism().unwrap_or_else(|_| NonZeroUsize::new(1).unwrap()) +} + +fn de_nz_usize<'de, D>(deserializer: D) -> Result +where + D: serde::Deserializer<'de>, +{ + let value = usize::deserialize(deserializer)?; + NonZeroUsize::new(value).ok_or_else(|| serde::de::Error::custom("value must be greater than 0")) +} + fn de_nz_u64<'de, D>(deserializer: D) -> Result, D::Error> where D: serde::Deserializer<'de>, @@ -442,19 +521,26 @@ mod tests { fn v8_heap_policy_defaults_when_omitted() { let config: ConfigFile = toml::from_str("").unwrap(); - assert_eq!(config.v8_heap_policy.heap_check_request_interval, Some(4_096)); assert_eq!( - config.v8_heap_policy.heap_check_time_interval, + config.v8.procedure_instance_pool_size, + default_v8_procedure_instance_pool_size() + ); + assert_eq!(config.v8.heap_policy.heap_check_request_interval, Some(4_096)); + assert_eq!( + config.v8.heap_policy.heap_check_time_interval, Some(Duration::from_secs(5)) ); - assert_eq!(config.v8_heap_policy.heap_gc_trigger_fraction, 0.67); - assert_eq!(config.v8_heap_policy.heap_retire_fraction, 0.75); - assert_eq!(config.v8_heap_policy.heap_limit_bytes, 1024 * 1024 * 1024); + assert_eq!(config.v8.heap_policy.heap_gc_trigger_fraction, 0.67); + assert_eq!(config.v8.heap_policy.heap_retire_fraction, 0.75); + assert_eq!(config.v8.heap_policy.heap_limit_bytes, 1024 * 1024 * 1024); } #[test] fn v8_heap_policy_parses_from_toml() { let toml = r#" + [v8] + procedure-instance-pool-size = 3 + [v8-heap-policy] heap-check-request-interval = 0 heap-check-time-interval = "45s" @@ -465,13 +551,14 @@ mod tests { let config: ConfigFile = toml::from_str(toml).unwrap(); - assert_eq!(config.v8_heap_policy.heap_check_request_interval, None); + assert_eq!(config.v8.procedure_instance_pool_size.get(), 3); + assert_eq!(config.v8.heap_policy.heap_check_request_interval, None); assert_eq!( - config.v8_heap_policy.heap_check_time_interval, + config.v8.heap_policy.heap_check_time_interval, Some(Duration::from_secs(45)) ); - assert_eq!(config.v8_heap_policy.heap_gc_trigger_fraction, 0.6); - assert_eq!(config.v8_heap_policy.heap_retire_fraction, 0.8); - assert_eq!(config.v8_heap_policy.heap_limit_bytes, 256 * 1024 * 1024); + assert_eq!(config.v8.heap_policy.heap_gc_trigger_fraction, 0.6); + assert_eq!(config.v8.heap_policy.heap_retire_fraction, 0.8); + assert_eq!(config.v8.heap_policy.heap_limit_bytes, 256 * 1024 * 1024); } } diff --git a/crates/core/src/host/host_controller.rs b/crates/core/src/host/host_controller.rs index 66083f160..84fac36c9 100644 --- a/crates/core/src/host/host_controller.rs +++ b/crates/core/src/host/host_controller.rs @@ -3,7 +3,7 @@ use super::scheduler::SchedulerStarter; use super::wasmtime::WasmtimeRuntime; use super::{Scheduler, UpdateDatabaseResult}; use crate::client::{ClientActorId, ClientName}; -use crate::config::V8HeapPolicyConfig; +use crate::config::V8Config; use crate::database_logger::DatabaseLogger; use crate::db::persistence::PersistenceProvider; use crate::db::relational_db::{self, spawn_view_cleanup_loop, DiskSizeFn, RelationalDB, Txdata}; @@ -125,9 +125,9 @@ pub(crate) struct HostRuntimes { } impl HostRuntimes { - fn new(data_dir: Option<&ServerDataDir>, v8_heap_policy: V8HeapPolicyConfig) -> Arc { + fn new(data_dir: Option<&ServerDataDir>, v8_config: V8Config) -> Arc { let wasmtime = WasmtimeRuntime::new(data_dir); - let v8 = V8Runtime::new(v8_heap_policy); + let v8 = V8Runtime::new(v8_config); Arc::new(Self { wasmtime, v8 }) } } @@ -211,7 +211,7 @@ impl HostController { pub fn new( data_dir: Arc, default_config: db::Config, - v8_heap_policy: V8HeapPolicyConfig, + v8_config: V8Config, program_storage: ProgramStorage, energy_monitor: Arc, persistence: Arc, @@ -223,7 +223,7 @@ impl HostController { program_storage, energy_monitor, persistence, - runtimes: HostRuntimes::new(Some(&data_dir), v8_heap_policy), + runtimes: HostRuntimes::new(Some(&data_dir), v8_config), data_dir, page_pool: PagePool::new(default_config.page_pool_max_size), bsatn_rlb_pool: BsatnRowListBuilderPool::new(), @@ -1365,7 +1365,7 @@ pub async fn extract_schema(program_bytes: Box<[u8]>, host_type: HostType) -> an extract_schema_with_pools( PagePool::new(None), BsatnRowListBuilderPool::new(), - &HostRuntimes::new(None, V8HeapPolicyConfig::default()), + &HostRuntimes::new(None, V8Config::default()), program_bytes, host_type, ) diff --git a/crates/core/src/host/module_host.rs b/crates/core/src/host/module_host.rs index 71a837d8a..5a645b2bc 100644 --- a/crates/core/src/host/module_host.rs +++ b/crates/core/src/host/module_host.rs @@ -1010,11 +1010,6 @@ impl CreateInstanceTimeMetric { } } -fn default_procedure_instance_limit() -> NonZeroUsize { - let num_cores = std::thread::available_parallelism().map(NonZeroUsize::get).unwrap_or(1); - NonZeroUsize::new(num_cores.saturating_mul(2)).expect("procedure instance limit should be non-zero") -} - impl ModuleInstanceManager { fn new(module: M, init_inst: Option, database_identity: Identity) -> Self { let metrics = InstanceManagerMetrics::new(module.host_type(), database_identity); @@ -1335,13 +1330,14 @@ impl ModuleHost { ModuleWithInstance::Js { module, init_inst } => { info = module.info(); let metrics = module.metrics(); + let procedure_instance_pool_size = module.procedure_instance_pool_size(); let host_module = module.clone(); let main_instance = SharedJsMainInstanceManager::new(init_inst, metrics.clone()); let procedure_instances = ModuleInstanceManager::new_bounded_with_metrics( module, None, metrics, - default_procedure_instance_limit(), + procedure_instance_pool_size, ); Arc::new(ModuleHostInner::Js(Box::new(V8ModuleHost { module: host_module, diff --git a/crates/core/src/host/v8/mod.rs b/crates/core/src/host/v8/mod.rs index 77da0caf4..f8bdc0a75 100644 --- a/crates/core/src/host/v8/mod.rs +++ b/crates/core/src/host/v8/mod.rs @@ -15,7 +15,7 @@ use super::module_host::{ }; use super::UpdateDatabaseResult; use crate::client::{ClientActorId, MeteredUnboundedReceiver, MeteredUnboundedSender}; -use crate::config::V8HeapPolicyConfig; +use crate::config::{V8Config, V8HeapPolicyConfig}; use crate::host::host_controller::CallProcedureReturn; use crate::host::instance_env::{ChunkPool, InstanceEnv, TxSlot}; use crate::host::module_host::{ @@ -51,6 +51,7 @@ use spacetimedb_schema::def::ModuleDef; use spacetimedb_schema::identifier::Identifier; use spacetimedb_table::static_assert_size; use std::cell::Cell; +use std::num::NonZeroUsize; use std::os::raw::c_void; use std::panic::{self, AssertUnwindSafe}; use std::sync::{Arc, LazyLock}; @@ -75,19 +76,19 @@ mod util; /// The V8 runtime, for modules written in e.g., JS or TypeScript. pub struct V8Runtime { - heap_policy: V8HeapPolicyConfig, + config: V8Config, } impl Default for V8Runtime { fn default() -> Self { - Self::new(V8HeapPolicyConfig::default()) + Self::new(V8Config::default()) } } impl V8Runtime { - pub fn new(heap_policy: V8HeapPolicyConfig) -> Self { + pub fn new(config: V8Config) -> Self { Self { - heap_policy: heap_policy.normalized(), + config: config.normalized(), } } @@ -98,7 +99,7 @@ impl V8Runtime { core: AllocatedJobCore, ) -> anyhow::Result { V8_RUNTIME_GLOBAL - .make_actor(mcc, program_bytes, core, self.heap_policy) + .make_actor(mcc, program_bytes, core, self.config) .await } } @@ -166,7 +167,7 @@ impl V8RuntimeInner { mcc: ModuleCreationContext, program_bytes: &[u8], core: AllocatedJobCore, - heap_policy: V8HeapPolicyConfig, + config: V8Config, ) -> anyhow::Result { #![allow(unreachable_code, unused_variables)] @@ -184,6 +185,7 @@ impl V8RuntimeInner { let mcc = Either::Right(mcc); let load_balance_guard = Arc::new(core.guard); let core_pinner = core.pinner; + let heap_policy = config.heap_policy; let (common, init_inst) = spawn_main_instance_worker( program.clone(), mcc, @@ -198,7 +200,8 @@ impl V8RuntimeInner { program, load_balance_guard, core_pinner, - heap_policy, + procedure_instance_pool_size: config.procedure_instance_pool_size, + heap_policy: config.heap_policy, metrics, }; @@ -212,6 +215,7 @@ pub struct JsModule { program: Arc, load_balance_guard: Arc, core_pinner: CorePinner, + procedure_instance_pool_size: NonZeroUsize, heap_policy: V8HeapPolicyConfig, metrics: InstanceManagerMetrics, } @@ -233,6 +237,10 @@ impl JsModule { self.metrics.clone() } + pub(in crate::host) fn procedure_instance_pool_size(&self) -> NonZeroUsize { + self.procedure_instance_pool_size + } + async fn create_procedure_instance(&self) -> JsProcedureInstance { let program = self.program.clone(); let common = self.common.clone(); diff --git a/crates/standalone/config.toml b/crates/standalone/config.toml index e7d1aec30..307bc808a 100644 --- a/crates/standalone/config.toml +++ b/crates/standalone/config.toml @@ -18,6 +18,11 @@ directives = [ "axum::rejection=trace", ] +[v8] +# Maximum number of JS procedure isolates per database. Omit to use the number +# of cores reported by the OS. +# procedure-instance-pool-size = 8 + [v8-heap-policy] # Check the V8 heap after this many requests. Set to 0 to disable. # heap-check-request-interval = 65536 diff --git a/crates/standalone/src/lib.rs b/crates/standalone/src/lib.rs index 68450a1b2..3b6a01a11 100644 --- a/crates/standalone/src/lib.rs +++ b/crates/standalone/src/lib.rs @@ -10,7 +10,7 @@ use async_trait::async_trait; use clap::{ArgMatches, Command}; use http::StatusCode; use spacetimedb::client::ClientActorIndex; -use spacetimedb::config::{CertificateAuthority, MetadataFile, V8HeapPolicyConfig}; +use spacetimedb::config::{CertificateAuthority, MetadataFile, V8Config}; use spacetimedb::db; use spacetimedb::db::persistence::LocalPersistenceProvider; use spacetimedb::energy::{EnergyBalance, EnergyQuanta, NullEnergyMonitor}; @@ -42,7 +42,7 @@ pub use spacetimedb_client_api::routes::subscribe::{BIN_PROTOCOL, TEXT_PROTOCOL} pub struct StandaloneOptions { pub db_config: db::Config, pub websocket: WebSocketOptions, - pub v8_heap_policy: V8HeapPolicyConfig, + pub v8: V8Config, } pub struct StandaloneEnv { @@ -79,7 +79,7 @@ impl StandaloneEnv { let host_controller = HostController::new( data_dir, config.db_config, - config.v8_heap_policy, + config.v8, program_store.clone(), energy_monitor, persistence_provider, @@ -650,7 +650,7 @@ mod tests { page_pool_max_size: None, }, websocket: WebSocketOptions::default(), - v8_heap_policy: V8HeapPolicyConfig::default(), + v8: V8Config::default(), }; let _env = StandaloneEnv::init(config, &ca, data_dir.clone(), JobCores::without_pinned_cores()).await?; diff --git a/crates/standalone/src/subcommands/start.rs b/crates/standalone/src/subcommands/start.rs index 1792a9f3c..b407372aa 100644 --- a/crates/standalone/src/subcommands/start.rs +++ b/crates/standalone/src/subcommands/start.rs @@ -182,7 +182,7 @@ pub async fn exec(args: &ArgMatches, db_cores: JobCores) -> anyhow::Result<()> { StandaloneOptions { db_config, websocket: config.websocket, - v8_heap_policy: config.common.v8_heap_policy, + v8: config.common.v8, }, &certs, data_dir, @@ -512,6 +512,9 @@ mod tests { idle-timeout = "1min" close-handshake-timeout = "500ms" + [v8] + procedure-instance-pool-size = 3 + [v8-heap-policy] heap-check-request-interval = 0 heap-check-time-interval = "45s" @@ -526,14 +529,15 @@ mod tests { // so check `common` in a pedestrian way. assert_eq!(&config.common.logs.directives, &["banana_shake=strawberry"]); assert!(config.common.certificate_authority.is_none()); - assert_eq!(config.common.v8_heap_policy.heap_check_request_interval, None); + assert_eq!(config.common.v8.procedure_instance_pool_size.get(), 3); + assert_eq!(config.common.v8.heap_policy.heap_check_request_interval, None); assert_eq!( - config.common.v8_heap_policy.heap_check_time_interval, + config.common.v8.heap_policy.heap_check_time_interval, Some(Duration::from_secs(45)) ); - assert_eq!(config.common.v8_heap_policy.heap_gc_trigger_fraction, 0.6); - assert_eq!(config.common.v8_heap_policy.heap_retire_fraction, 0.8); - assert_eq!(config.common.v8_heap_policy.heap_limit_bytes, 128 * 1024 * 1024); + assert_eq!(config.common.v8.heap_policy.heap_gc_trigger_fraction, 0.6); + assert_eq!(config.common.v8.heap_policy.heap_retire_fraction, 0.8); + assert_eq!(config.common.v8.heap_policy.heap_limit_bytes, 128 * 1024 * 1024); assert_eq!( config.websocket, diff --git a/crates/testing/src/modules.rs b/crates/testing/src/modules.rs index 318fff8d0..03ba40c4e 100644 --- a/crates/testing/src/modules.rs +++ b/crates/testing/src/modules.rs @@ -201,7 +201,7 @@ impl CompiledModule { spacetimedb_standalone::StandaloneOptions { db_config: config, websocket: WebSocketOptions::default(), - v8_heap_policy: Default::default(), + v8: Default::default(), }, &certs, paths.data_dir.into(),