mirror of
https://github.com/clockworklabs/SpacetimeDB.git
synced 2026-05-06 07:26:43 -04:00
Make procedure instance pool configurable
This commit is contained in:
+101
-14
@@ -1,3 +1,4 @@
|
||||
use std::num::NonZeroUsize;
|
||||
use std::path::Path;
|
||||
use std::time::Duration;
|
||||
use std::{fmt, io};
|
||||
@@ -132,15 +133,41 @@ impl fmt::Display for MetadataFile {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Default)]
|
||||
pub struct ConfigFile {
|
||||
pub certificate_authority: Option<CertificateAuthority>,
|
||||
pub logs: LogConfig,
|
||||
pub v8: V8Config,
|
||||
}
|
||||
|
||||
#[derive(serde::Deserialize, Default)]
|
||||
#[serde(rename_all = "kebab-case")]
|
||||
pub struct ConfigFile {
|
||||
struct ConfigFileToml {
|
||||
#[serde(default)]
|
||||
pub certificate_authority: Option<CertificateAuthority>,
|
||||
certificate_authority: Option<CertificateAuthority>,
|
||||
#[serde(default)]
|
||||
pub logs: LogConfig,
|
||||
logs: LogConfig,
|
||||
#[serde(default)]
|
||||
pub v8_heap_policy: V8HeapPolicyConfig,
|
||||
v8: V8ConfigToml,
|
||||
#[serde(default)]
|
||||
v8_heap_policy: V8HeapPolicyConfig,
|
||||
}
|
||||
|
||||
impl<'de> serde::Deserialize<'de> for ConfigFile {
|
||||
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
|
||||
where
|
||||
D: serde::Deserializer<'de>,
|
||||
{
|
||||
let config = ConfigFileToml::deserialize(deserializer)?;
|
||||
Ok(Self {
|
||||
certificate_authority: config.certificate_authority,
|
||||
logs: config.logs,
|
||||
v8: V8Config {
|
||||
procedure_instance_pool_size: config.v8.procedure_instance_pool_size,
|
||||
heap_policy: config.v8_heap_policy,
|
||||
},
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
impl ConfigFile {
|
||||
@@ -179,6 +206,46 @@ pub struct LogConfig {
|
||||
pub directives: Vec<String>,
|
||||
}
|
||||
|
||||
#[derive(Clone, Copy, Debug)]
|
||||
pub struct V8Config {
|
||||
pub procedure_instance_pool_size: NonZeroUsize,
|
||||
pub heap_policy: V8HeapPolicyConfig,
|
||||
}
|
||||
|
||||
impl Default for V8Config {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
procedure_instance_pool_size: default_v8_procedure_instance_pool_size(),
|
||||
heap_policy: V8HeapPolicyConfig::default(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl V8Config {
|
||||
pub fn normalized(mut self) -> Self {
|
||||
self.heap_policy = self.heap_policy.normalized();
|
||||
self
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone, Copy, Debug, serde::Deserialize)]
|
||||
#[serde(rename_all = "kebab-case")]
|
||||
struct V8ConfigToml {
|
||||
#[serde(
|
||||
default = "default_v8_procedure_instance_pool_size",
|
||||
deserialize_with = "de_nz_usize"
|
||||
)]
|
||||
pub procedure_instance_pool_size: NonZeroUsize,
|
||||
}
|
||||
|
||||
impl Default for V8ConfigToml {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
procedure_instance_pool_size: default_v8_procedure_instance_pool_size(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone, Copy, Debug, serde::Deserialize)]
|
||||
#[serde(rename_all = "kebab-case")]
|
||||
pub struct V8HeapPolicyConfig {
|
||||
@@ -252,6 +319,18 @@ fn def_heap_limit() -> usize {
|
||||
1024 * 1024 * 1024
|
||||
}
|
||||
|
||||
fn default_v8_procedure_instance_pool_size() -> NonZeroUsize {
|
||||
std::thread::available_parallelism().unwrap_or_else(|_| NonZeroUsize::new(1).unwrap())
|
||||
}
|
||||
|
||||
fn de_nz_usize<'de, D>(deserializer: D) -> Result<NonZeroUsize, D::Error>
|
||||
where
|
||||
D: serde::Deserializer<'de>,
|
||||
{
|
||||
let value = usize::deserialize(deserializer)?;
|
||||
NonZeroUsize::new(value).ok_or_else(|| serde::de::Error::custom("value must be greater than 0"))
|
||||
}
|
||||
|
||||
fn de_nz_u64<'de, D>(deserializer: D) -> Result<Option<u64>, D::Error>
|
||||
where
|
||||
D: serde::Deserializer<'de>,
|
||||
@@ -442,19 +521,26 @@ mod tests {
|
||||
fn v8_heap_policy_defaults_when_omitted() {
|
||||
let config: ConfigFile = toml::from_str("").unwrap();
|
||||
|
||||
assert_eq!(config.v8_heap_policy.heap_check_request_interval, Some(4_096));
|
||||
assert_eq!(
|
||||
config.v8_heap_policy.heap_check_time_interval,
|
||||
config.v8.procedure_instance_pool_size,
|
||||
default_v8_procedure_instance_pool_size()
|
||||
);
|
||||
assert_eq!(config.v8.heap_policy.heap_check_request_interval, Some(4_096));
|
||||
assert_eq!(
|
||||
config.v8.heap_policy.heap_check_time_interval,
|
||||
Some(Duration::from_secs(5))
|
||||
);
|
||||
assert_eq!(config.v8_heap_policy.heap_gc_trigger_fraction, 0.67);
|
||||
assert_eq!(config.v8_heap_policy.heap_retire_fraction, 0.75);
|
||||
assert_eq!(config.v8_heap_policy.heap_limit_bytes, 1024 * 1024 * 1024);
|
||||
assert_eq!(config.v8.heap_policy.heap_gc_trigger_fraction, 0.67);
|
||||
assert_eq!(config.v8.heap_policy.heap_retire_fraction, 0.75);
|
||||
assert_eq!(config.v8.heap_policy.heap_limit_bytes, 1024 * 1024 * 1024);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn v8_heap_policy_parses_from_toml() {
|
||||
let toml = r#"
|
||||
[v8]
|
||||
procedure-instance-pool-size = 3
|
||||
|
||||
[v8-heap-policy]
|
||||
heap-check-request-interval = 0
|
||||
heap-check-time-interval = "45s"
|
||||
@@ -465,13 +551,14 @@ mod tests {
|
||||
|
||||
let config: ConfigFile = toml::from_str(toml).unwrap();
|
||||
|
||||
assert_eq!(config.v8_heap_policy.heap_check_request_interval, None);
|
||||
assert_eq!(config.v8.procedure_instance_pool_size.get(), 3);
|
||||
assert_eq!(config.v8.heap_policy.heap_check_request_interval, None);
|
||||
assert_eq!(
|
||||
config.v8_heap_policy.heap_check_time_interval,
|
||||
config.v8.heap_policy.heap_check_time_interval,
|
||||
Some(Duration::from_secs(45))
|
||||
);
|
||||
assert_eq!(config.v8_heap_policy.heap_gc_trigger_fraction, 0.6);
|
||||
assert_eq!(config.v8_heap_policy.heap_retire_fraction, 0.8);
|
||||
assert_eq!(config.v8_heap_policy.heap_limit_bytes, 256 * 1024 * 1024);
|
||||
assert_eq!(config.v8.heap_policy.heap_gc_trigger_fraction, 0.6);
|
||||
assert_eq!(config.v8.heap_policy.heap_retire_fraction, 0.8);
|
||||
assert_eq!(config.v8.heap_policy.heap_limit_bytes, 256 * 1024 * 1024);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -3,7 +3,7 @@ use super::scheduler::SchedulerStarter;
|
||||
use super::wasmtime::WasmtimeRuntime;
|
||||
use super::{Scheduler, UpdateDatabaseResult};
|
||||
use crate::client::{ClientActorId, ClientName};
|
||||
use crate::config::V8HeapPolicyConfig;
|
||||
use crate::config::V8Config;
|
||||
use crate::database_logger::DatabaseLogger;
|
||||
use crate::db::persistence::PersistenceProvider;
|
||||
use crate::db::relational_db::{self, spawn_view_cleanup_loop, DiskSizeFn, RelationalDB, Txdata};
|
||||
@@ -125,9 +125,9 @@ pub(crate) struct HostRuntimes {
|
||||
}
|
||||
|
||||
impl HostRuntimes {
|
||||
fn new(data_dir: Option<&ServerDataDir>, v8_heap_policy: V8HeapPolicyConfig) -> Arc<Self> {
|
||||
fn new(data_dir: Option<&ServerDataDir>, v8_config: V8Config) -> Arc<Self> {
|
||||
let wasmtime = WasmtimeRuntime::new(data_dir);
|
||||
let v8 = V8Runtime::new(v8_heap_policy);
|
||||
let v8 = V8Runtime::new(v8_config);
|
||||
Arc::new(Self { wasmtime, v8 })
|
||||
}
|
||||
}
|
||||
@@ -211,7 +211,7 @@ impl HostController {
|
||||
pub fn new(
|
||||
data_dir: Arc<ServerDataDir>,
|
||||
default_config: db::Config,
|
||||
v8_heap_policy: V8HeapPolicyConfig,
|
||||
v8_config: V8Config,
|
||||
program_storage: ProgramStorage,
|
||||
energy_monitor: Arc<impl EnergyMonitor>,
|
||||
persistence: Arc<dyn PersistenceProvider>,
|
||||
@@ -223,7 +223,7 @@ impl HostController {
|
||||
program_storage,
|
||||
energy_monitor,
|
||||
persistence,
|
||||
runtimes: HostRuntimes::new(Some(&data_dir), v8_heap_policy),
|
||||
runtimes: HostRuntimes::new(Some(&data_dir), v8_config),
|
||||
data_dir,
|
||||
page_pool: PagePool::new(default_config.page_pool_max_size),
|
||||
bsatn_rlb_pool: BsatnRowListBuilderPool::new(),
|
||||
@@ -1365,7 +1365,7 @@ pub async fn extract_schema(program_bytes: Box<[u8]>, host_type: HostType) -> an
|
||||
extract_schema_with_pools(
|
||||
PagePool::new(None),
|
||||
BsatnRowListBuilderPool::new(),
|
||||
&HostRuntimes::new(None, V8HeapPolicyConfig::default()),
|
||||
&HostRuntimes::new(None, V8Config::default()),
|
||||
program_bytes,
|
||||
host_type,
|
||||
)
|
||||
|
||||
@@ -1010,11 +1010,6 @@ impl CreateInstanceTimeMetric {
|
||||
}
|
||||
}
|
||||
|
||||
fn default_procedure_instance_limit() -> NonZeroUsize {
|
||||
let num_cores = std::thread::available_parallelism().map(NonZeroUsize::get).unwrap_or(1);
|
||||
NonZeroUsize::new(num_cores.saturating_mul(2)).expect("procedure instance limit should be non-zero")
|
||||
}
|
||||
|
||||
impl<M: GenericModule> ModuleInstanceManager<M> {
|
||||
fn new(module: M, init_inst: Option<M::Instance>, database_identity: Identity) -> Self {
|
||||
let metrics = InstanceManagerMetrics::new(module.host_type(), database_identity);
|
||||
@@ -1335,13 +1330,14 @@ impl ModuleHost {
|
||||
ModuleWithInstance::Js { module, init_inst } => {
|
||||
info = module.info();
|
||||
let metrics = module.metrics();
|
||||
let procedure_instance_pool_size = module.procedure_instance_pool_size();
|
||||
let host_module = module.clone();
|
||||
let main_instance = SharedJsMainInstanceManager::new(init_inst, metrics.clone());
|
||||
let procedure_instances = ModuleInstanceManager::new_bounded_with_metrics(
|
||||
module,
|
||||
None,
|
||||
metrics,
|
||||
default_procedure_instance_limit(),
|
||||
procedure_instance_pool_size,
|
||||
);
|
||||
Arc::new(ModuleHostInner::Js(Box::new(V8ModuleHost {
|
||||
module: host_module,
|
||||
|
||||
@@ -15,7 +15,7 @@ use super::module_host::{
|
||||
};
|
||||
use super::UpdateDatabaseResult;
|
||||
use crate::client::{ClientActorId, MeteredUnboundedReceiver, MeteredUnboundedSender};
|
||||
use crate::config::V8HeapPolicyConfig;
|
||||
use crate::config::{V8Config, V8HeapPolicyConfig};
|
||||
use crate::host::host_controller::CallProcedureReturn;
|
||||
use crate::host::instance_env::{ChunkPool, InstanceEnv, TxSlot};
|
||||
use crate::host::module_host::{
|
||||
@@ -51,6 +51,7 @@ use spacetimedb_schema::def::ModuleDef;
|
||||
use spacetimedb_schema::identifier::Identifier;
|
||||
use spacetimedb_table::static_assert_size;
|
||||
use std::cell::Cell;
|
||||
use std::num::NonZeroUsize;
|
||||
use std::os::raw::c_void;
|
||||
use std::panic::{self, AssertUnwindSafe};
|
||||
use std::sync::{Arc, LazyLock};
|
||||
@@ -75,19 +76,19 @@ mod util;
|
||||
|
||||
/// The V8 runtime, for modules written in e.g., JS or TypeScript.
|
||||
pub struct V8Runtime {
|
||||
heap_policy: V8HeapPolicyConfig,
|
||||
config: V8Config,
|
||||
}
|
||||
|
||||
impl Default for V8Runtime {
|
||||
fn default() -> Self {
|
||||
Self::new(V8HeapPolicyConfig::default())
|
||||
Self::new(V8Config::default())
|
||||
}
|
||||
}
|
||||
|
||||
impl V8Runtime {
|
||||
pub fn new(heap_policy: V8HeapPolicyConfig) -> Self {
|
||||
pub fn new(config: V8Config) -> Self {
|
||||
Self {
|
||||
heap_policy: heap_policy.normalized(),
|
||||
config: config.normalized(),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -98,7 +99,7 @@ impl V8Runtime {
|
||||
core: AllocatedJobCore,
|
||||
) -> anyhow::Result<ModuleWithInstance> {
|
||||
V8_RUNTIME_GLOBAL
|
||||
.make_actor(mcc, program_bytes, core, self.heap_policy)
|
||||
.make_actor(mcc, program_bytes, core, self.config)
|
||||
.await
|
||||
}
|
||||
}
|
||||
@@ -166,7 +167,7 @@ impl V8RuntimeInner {
|
||||
mcc: ModuleCreationContext,
|
||||
program_bytes: &[u8],
|
||||
core: AllocatedJobCore,
|
||||
heap_policy: V8HeapPolicyConfig,
|
||||
config: V8Config,
|
||||
) -> anyhow::Result<ModuleWithInstance> {
|
||||
#![allow(unreachable_code, unused_variables)]
|
||||
|
||||
@@ -184,6 +185,7 @@ impl V8RuntimeInner {
|
||||
let mcc = Either::Right(mcc);
|
||||
let load_balance_guard = Arc::new(core.guard);
|
||||
let core_pinner = core.pinner;
|
||||
let heap_policy = config.heap_policy;
|
||||
let (common, init_inst) = spawn_main_instance_worker(
|
||||
program.clone(),
|
||||
mcc,
|
||||
@@ -198,7 +200,8 @@ impl V8RuntimeInner {
|
||||
program,
|
||||
load_balance_guard,
|
||||
core_pinner,
|
||||
heap_policy,
|
||||
procedure_instance_pool_size: config.procedure_instance_pool_size,
|
||||
heap_policy: config.heap_policy,
|
||||
metrics,
|
||||
};
|
||||
|
||||
@@ -212,6 +215,7 @@ pub struct JsModule {
|
||||
program: Arc<str>,
|
||||
load_balance_guard: Arc<LoadBalanceOnDropGuard>,
|
||||
core_pinner: CorePinner,
|
||||
procedure_instance_pool_size: NonZeroUsize,
|
||||
heap_policy: V8HeapPolicyConfig,
|
||||
metrics: InstanceManagerMetrics,
|
||||
}
|
||||
@@ -233,6 +237,10 @@ impl JsModule {
|
||||
self.metrics.clone()
|
||||
}
|
||||
|
||||
pub(in crate::host) fn procedure_instance_pool_size(&self) -> NonZeroUsize {
|
||||
self.procedure_instance_pool_size
|
||||
}
|
||||
|
||||
async fn create_procedure_instance(&self) -> JsProcedureInstance {
|
||||
let program = self.program.clone();
|
||||
let common = self.common.clone();
|
||||
|
||||
@@ -18,6 +18,11 @@ directives = [
|
||||
"axum::rejection=trace",
|
||||
]
|
||||
|
||||
[v8]
|
||||
# Maximum number of JS procedure isolates per database. Omit to use the number
|
||||
# of cores reported by the OS.
|
||||
# procedure-instance-pool-size = 8
|
||||
|
||||
[v8-heap-policy]
|
||||
# Check the V8 heap after this many requests. Set to 0 to disable.
|
||||
# heap-check-request-interval = 65536
|
||||
|
||||
@@ -10,7 +10,7 @@ use async_trait::async_trait;
|
||||
use clap::{ArgMatches, Command};
|
||||
use http::StatusCode;
|
||||
use spacetimedb::client::ClientActorIndex;
|
||||
use spacetimedb::config::{CertificateAuthority, MetadataFile, V8HeapPolicyConfig};
|
||||
use spacetimedb::config::{CertificateAuthority, MetadataFile, V8Config};
|
||||
use spacetimedb::db;
|
||||
use spacetimedb::db::persistence::LocalPersistenceProvider;
|
||||
use spacetimedb::energy::{EnergyBalance, EnergyQuanta, NullEnergyMonitor};
|
||||
@@ -42,7 +42,7 @@ pub use spacetimedb_client_api::routes::subscribe::{BIN_PROTOCOL, TEXT_PROTOCOL}
|
||||
pub struct StandaloneOptions {
|
||||
pub db_config: db::Config,
|
||||
pub websocket: WebSocketOptions,
|
||||
pub v8_heap_policy: V8HeapPolicyConfig,
|
||||
pub v8: V8Config,
|
||||
}
|
||||
|
||||
pub struct StandaloneEnv {
|
||||
@@ -79,7 +79,7 @@ impl StandaloneEnv {
|
||||
let host_controller = HostController::new(
|
||||
data_dir,
|
||||
config.db_config,
|
||||
config.v8_heap_policy,
|
||||
config.v8,
|
||||
program_store.clone(),
|
||||
energy_monitor,
|
||||
persistence_provider,
|
||||
@@ -650,7 +650,7 @@ mod tests {
|
||||
page_pool_max_size: None,
|
||||
},
|
||||
websocket: WebSocketOptions::default(),
|
||||
v8_heap_policy: V8HeapPolicyConfig::default(),
|
||||
v8: V8Config::default(),
|
||||
};
|
||||
|
||||
let _env = StandaloneEnv::init(config, &ca, data_dir.clone(), JobCores::without_pinned_cores()).await?;
|
||||
|
||||
@@ -182,7 +182,7 @@ pub async fn exec(args: &ArgMatches, db_cores: JobCores) -> anyhow::Result<()> {
|
||||
StandaloneOptions {
|
||||
db_config,
|
||||
websocket: config.websocket,
|
||||
v8_heap_policy: config.common.v8_heap_policy,
|
||||
v8: config.common.v8,
|
||||
},
|
||||
&certs,
|
||||
data_dir,
|
||||
@@ -512,6 +512,9 @@ mod tests {
|
||||
idle-timeout = "1min"
|
||||
close-handshake-timeout = "500ms"
|
||||
|
||||
[v8]
|
||||
procedure-instance-pool-size = 3
|
||||
|
||||
[v8-heap-policy]
|
||||
heap-check-request-interval = 0
|
||||
heap-check-time-interval = "45s"
|
||||
@@ -526,14 +529,15 @@ mod tests {
|
||||
// so check `common` in a pedestrian way.
|
||||
assert_eq!(&config.common.logs.directives, &["banana_shake=strawberry"]);
|
||||
assert!(config.common.certificate_authority.is_none());
|
||||
assert_eq!(config.common.v8_heap_policy.heap_check_request_interval, None);
|
||||
assert_eq!(config.common.v8.procedure_instance_pool_size.get(), 3);
|
||||
assert_eq!(config.common.v8.heap_policy.heap_check_request_interval, None);
|
||||
assert_eq!(
|
||||
config.common.v8_heap_policy.heap_check_time_interval,
|
||||
config.common.v8.heap_policy.heap_check_time_interval,
|
||||
Some(Duration::from_secs(45))
|
||||
);
|
||||
assert_eq!(config.common.v8_heap_policy.heap_gc_trigger_fraction, 0.6);
|
||||
assert_eq!(config.common.v8_heap_policy.heap_retire_fraction, 0.8);
|
||||
assert_eq!(config.common.v8_heap_policy.heap_limit_bytes, 128 * 1024 * 1024);
|
||||
assert_eq!(config.common.v8.heap_policy.heap_gc_trigger_fraction, 0.6);
|
||||
assert_eq!(config.common.v8.heap_policy.heap_retire_fraction, 0.8);
|
||||
assert_eq!(config.common.v8.heap_policy.heap_limit_bytes, 128 * 1024 * 1024);
|
||||
|
||||
assert_eq!(
|
||||
config.websocket,
|
||||
|
||||
@@ -201,7 +201,7 @@ impl CompiledModule {
|
||||
spacetimedb_standalone::StandaloneOptions {
|
||||
db_config: config,
|
||||
websocket: WebSocketOptions::default(),
|
||||
v8_heap_policy: Default::default(),
|
||||
v8: Default::default(),
|
||||
},
|
||||
&certs,
|
||||
paths.data_dir.into(),
|
||||
|
||||
Reference in New Issue
Block a user