diff --git a/crates/client-api/src/routes/database.rs b/crates/client-api/src/routes/database.rs index 8a272002e..6b753a9c8 100644 --- a/crates/client-api/src/routes/database.rs +++ b/crates/client-api/src/routes/database.rs @@ -98,6 +98,7 @@ fn map_reducer_error(e: ReducerCallError, reducer: &str) -> (StatusCode, String) log::debug!("Attempt to call non-existent reducer {reducer}"); StatusCode::NOT_FOUND } + ReducerCallError::WorkerError(_) => StatusCode::INTERNAL_SERVER_ERROR, ReducerCallError::LifecycleReducer(lifecycle) => { log::debug!("Attempt to call {lifecycle:?} lifecycle reducer {reducer}"); StatusCode::BAD_REQUEST diff --git a/crates/core/src/host/module_host.rs b/crates/core/src/host/module_host.rs index 2401a64b8..645b61bbc 100644 --- a/crates/core/src/host/module_host.rs +++ b/crates/core/src/host/module_host.rs @@ -11,7 +11,7 @@ use crate::error::DBError; use crate::estimation::{check_row_limit, estimate_rows_scanned}; use crate::hash::Hash; use crate::host::host_controller::CallProcedureReturn; -use crate::host::scheduler::{CallScheduledFunctionResult, ScheduledFunctionParams}; +use crate::host::scheduler::{CallScheduledFunctionError, CallScheduledFunctionResult, ScheduledFunctionParams}; use crate::host::v8::JsInstance; pub use crate::host::wasm_common::module_host_actor::{InstanceCommon, WasmInstance}; use crate::host::wasmtime::ModuleInstance; @@ -357,7 +357,9 @@ struct WasmtimeModuleHost { } struct V8ModuleHost { - instance_manager: ModuleInstanceManager, + module: super::v8::JsModule, + instance_lane: super::v8::JsInstanceLane, + procedure_instances: ModuleInstanceManager, } /// A module; used as a bound on `InstanceManager`. @@ -813,7 +815,7 @@ impl CreateInstanceTimeMetric { } impl ModuleInstanceManager { - fn new(module: M, init_inst: M::Instance, database_identity: Identity) -> Self { + fn new(module: M, init_inst: Option, database_identity: Identity) -> Self { let host_type = module.host_type(); let module_instances_metric = ModuleInstancesMetric { metric: WORKER_METRICS @@ -832,9 +834,8 @@ impl ModuleInstanceManager { database_identity, }; - // Add the first instance. let mut instances = VecDeque::new(); - instances.push_front(init_inst); + instances.extend(init_inst); Self { instances: Mutex::new(instances), @@ -937,6 +938,8 @@ pub enum ReducerCallError { Args(#[from] InvalidReducerArguments), #[error(transparent)] NoSuchModule(#[from] NoSuchModule), + #[error("The reducer worker encountered a fatal error: {0}")] + WorkerError(String), #[error("no such reducer")] NoSuchReducer, #[error("no such scheduled reducer")] @@ -1066,7 +1069,7 @@ impl ModuleHost { init_inst, } => { info = module.info(); - let instance_manager = ModuleInstanceManager::new(module, init_inst, database_identity); + let instance_manager = ModuleInstanceManager::new(module, Some(init_inst), database_identity); Arc::new(ModuleHostInner::Wasm(WasmtimeModuleHost { executor, instance_manager, @@ -1074,8 +1077,13 @@ impl ModuleHost { } ModuleWithInstance::Js { module, init_inst } => { info = module.info(); - let instance_manager = ModuleInstanceManager::new(module, init_inst, database_identity); - Arc::new(ModuleHostInner::Js(V8ModuleHost { instance_manager })) + let instance_lane = super::v8::JsInstanceLane::new(module.clone(), init_inst); + let procedure_instances = ModuleInstanceManager::new(module.clone(), None, database_identity); + Arc::new(ModuleHostInner::Js(V8ModuleHost { + module, + instance_lane, + procedure_instances, + })) } }; let on_panic = Arc::new(on_panic); @@ -1143,18 +1151,13 @@ impl ModuleHost { }) .await } - ModuleHostInner::Js(V8ModuleHost { instance_manager }) => { - instance_manager - .with_instance(async |mut inst| { - let res = inst - .run_on_thread(async move || { - drop(timer_guard); - f().await - }) - .await; - (res, inst) + ModuleHostInner::Js(V8ModuleHost { instance_lane, .. }) => { + instance_lane + .run_on_thread(async move || { + drop(timer_guard); + f().await }) - .await + .await? } }) } @@ -1193,7 +1196,7 @@ impl ModuleHost { arg: A, timer: impl FnOnce(&str) -> Guard, work_wasm: impl AsyncFnOnce(Guard, &SingleCoreExecutor, Box, A) -> (R, Box), - work_js: impl AsyncFnOnce(Guard, &mut JsInstance, A) -> R, + work_js: impl AsyncFnOnce(Guard, &super::v8::JsInstanceLane, A) -> R, ) -> Result { self.guard_closed()?; let timer_guard = timer(label); @@ -1220,11 +1223,7 @@ impl ModuleHost { .with_instance(async |inst| work_wasm(timer_guard, executor, inst, arg).await) .await } - ModuleHostInner::Js(V8ModuleHost { instance_manager }) => { - instance_manager - .with_instance(async |mut inst| (work_js(timer_guard, &mut inst, arg).await, inst)) - .await - } + ModuleHostInner::Js(V8ModuleHost { instance_lane, .. }) => work_js(timer_guard, instance_lane, arg).await, }) } @@ -1237,7 +1236,7 @@ impl ModuleHost { label: &str, arg: A, wasm: impl AsyncFnOnce(A, &mut ModuleInstance) -> R + Send + 'static, - js: impl AsyncFnOnce(A, &mut JsInstance) -> R, + js: impl AsyncFnOnce(A, &super::v8::JsInstanceLane) -> R, ) -> Result where R: Send + 'static, @@ -1262,6 +1261,7 @@ impl ModuleHost { .await }, async move |timer_guard, inst, arg| { + super::v8::assert_not_on_js_module_thread(label); drop(timer_guard); js(arg, inst).await }, @@ -1269,6 +1269,70 @@ impl ModuleHost { .await } + /// Run a function for this module using pooled instances. + /// + /// For WASM, this is identical to [`Self::call`]. + /// For V8/JS, this uses the pooled procedure instances instead of the + /// single instance lane. + async fn call_pooled( + &self, + label: &str, + arg: A, + wasm: impl AsyncFnOnce(A, &mut ModuleInstance) -> R + Send + 'static, + js: impl AsyncFnOnce(A, &JsInstance) -> R, + ) -> Result + where + R: Send + 'static, + A: Send + 'static, + { + self.guard_closed()?; + let timer_guard = self.start_call_timer(label); + + scopeguard::defer_on_unwind!({ + log::warn!("pooled operation {label} panicked"); + (self.on_panic)(); + }); + + Ok(match &*self.inner { + ModuleHostInner::Wasm(WasmtimeModuleHost { + executor, + instance_manager, + }) => { + instance_manager + .with_instance(async |mut inst| { + executor + .run_job(async move || { + drop(timer_guard); + (wasm(arg, &mut inst).await, inst) + }) + .await + }) + .await + } + ModuleHostInner::Js(V8ModuleHost { + procedure_instances, .. + }) => { + procedure_instances + .with_instance(async |inst| { + drop(timer_guard); + let res = js(arg, &inst).await; + (res, inst) + }) + .await + } + }) + } + + async fn call_view_command(&self, label: &str, cmd: ViewCommand) -> Result { + self.call_pooled( + label, + cmd, + async |cmd, inst| Ok::<_, ViewCallError>(inst.call_view(cmd)), + async |cmd, inst| Ok::<_, ViewCallError>(inst.call_view(cmd).await), + ) + .await? + } + pub async fn disconnect_client(&self, client_id: ClientActorId) { log::trace!("disconnecting client {client_id}"); if let Err(e) = self @@ -1536,14 +1600,13 @@ impl ModuleHost { args, }; - Ok(self - .call( - &reducer_def.name, - call_reducer_params, - async |p, inst| inst.call_reducer(p), - async |p, inst| inst.call_reducer(p).await, - ) - .await?) + self.call( + &reducer_def.name, + call_reducer_params, + async |p, inst| Ok(inst.call_reducer(p)), + async |p, inst| inst.call_reducer(p).await, + ) + .await? } pub async fn call_reducer( @@ -1611,12 +1674,7 @@ impl ModuleHost { }; let res = self - .call( - "call_view_add_single_subscription", - cmd, - async |cmd, inst| inst.call_view(cmd), - async |cmd, inst| inst.call_view(cmd).await, - ) + .call_view_command("call_view_add_single_subscription", cmd) .await //TODO: handle error better .map_err(|e| DBError::Other(anyhow::anyhow!(e)))?; @@ -1644,12 +1702,7 @@ impl ModuleHost { }; let res = self - .call( - "call_view_add_multi_subscription", - cmd, - async |cmd, inst| inst.call_view(cmd), - async |cmd, inst| inst.call_view(cmd).await, - ) + .call_view_command("call_view_add_multi_subscription", cmd) .await //TODO: handle error better .map_err(|e| DBError::Other(anyhow::anyhow!(e)))?; @@ -1677,12 +1730,7 @@ impl ModuleHost { }; let res = self - .call( - "call_view_remove_v2_subscription", - cmd, - async |cmd, inst| inst.call_view(cmd), - async |cmd, inst| inst.call_view(cmd).await, - ) + .call_view_command("call_view_remove_v2_subscription", cmd) .await .map_err(|e| DBError::Other(anyhow::anyhow!(e)))?; @@ -1709,12 +1757,7 @@ impl ModuleHost { }; let res = self - .call( - "call_view_add_multi_subscription", - cmd, - async |cmd, inst| inst.call_view(cmd), - async |cmd, inst| inst.call_view(cmd).await, - ) + .call_view_command("call_view_add_multi_subscription", cmd) .await //TODO: handle error better .map_err(|e| DBError::Other(anyhow::anyhow!(e)))?; @@ -1742,12 +1785,7 @@ impl ModuleHost { }; let res = self - .call( - "call_view_add_legacy_subscription", - cmd, - async |cmd, inst| inst.call_view(cmd), - async |cmd, inst| inst.call_view(cmd).await, - ) + .call_view_command("call_view_add_legacy_subscription", cmd) .await //TODO: handle error better .map_err(|e| DBError::Other(anyhow::anyhow!(e)))?; @@ -1776,12 +1814,7 @@ impl ModuleHost { }; let res = self - .call( - "call_view_sql", - cmd, - async |cmd, inst| inst.call_view(cmd), - async |cmd, inst| inst.call_view(cmd).await, - ) + .call_view_command("call_view_sql", cmd) .await //TODO: handle error better .map_err(|e| DBError::Other(anyhow::anyhow!(e)))?; @@ -1885,7 +1918,7 @@ impl ModuleHost { name: &str, params: CallProcedureParams, ) -> Result { - self.call( + self.call_pooled( name, params, async move |params, inst| inst.call_procedure(params).await, @@ -1897,14 +1930,14 @@ impl ModuleHost { pub(super) async fn call_scheduled_function( &self, params: ScheduledFunctionParams, - ) -> Result { - self.call( + ) -> Result { + self.call_pooled( "unknown scheduled function", params, - async move |params, inst| inst.call_scheduled_function(params).await, - async move |params, inst| inst.call_scheduled_function(params).await, + async move |params, inst| Ok(inst.call_scheduled_function(params).await), + async move |params, inst| Ok(inst.call_scheduled_function(params).await), ) - .await + .await? } /// Materializes the views return by the `view_collector`, if not already materialized, @@ -2467,14 +2500,14 @@ impl ModuleHost { pub(crate) fn replica_ctx(&self) -> &ReplicaContext { match &*self.inner { ModuleHostInner::Wasm(wasm) => wasm.instance_manager.module.replica_ctx(), - ModuleHostInner::Js(js) => js.instance_manager.module.replica_ctx(), + ModuleHostInner::Js(js) => js.module.replica_ctx(), } } fn scheduler(&self) -> &Scheduler { match &*self.inner { ModuleHostInner::Wasm(wasm) => wasm.instance_manager.module.scheduler(), - ModuleHostInner::Js(js) => js.instance_manager.module.scheduler(), + ModuleHostInner::Js(js) => js.module.scheduler(), } } } diff --git a/crates/core/src/host/scheduler.rs b/crates/core/src/host/scheduler.rs index 7328376c0..d3b285e9f 100644 --- a/crates/core/src/host/scheduler.rs +++ b/crates/core/src/host/scheduler.rs @@ -268,13 +268,21 @@ struct SchedulerActor { module_host: WeakModuleHost, } +#[derive(Clone)] enum QueueItem { Id { id: ScheduledFunctionId, at: Timestamp }, VolatileNonatomicImmediate { function_name: String, args: FunctionArgs }, } +#[derive(Clone)] pub(crate) struct ScheduledFunctionParams(QueueItem); +#[derive(thiserror::Error, Debug)] +pub(crate) enum CallScheduledFunctionError { + #[error(transparent)] + NoSuchModule(#[from] NoSuchModule), +} + #[cfg(target_pointer_width = "64")] spacetimedb_table::static_assert_size!(QueueItem, 64); @@ -320,8 +328,8 @@ impl SchedulerActor { async fn handle_queued(&mut self, id: Expired) { let item = id.into_inner(); - let id: Option = match item { - QueueItem::Id { id, .. } => Some(id), + let id: Option = match &item { + QueueItem::Id { id, .. } => Some(*id), QueueItem::VolatileNonatomicImmediate { .. } => None, }; if let Some(id) = id { @@ -337,7 +345,7 @@ impl SchedulerActor { match result { // If the module already exited, leave the `ScheduledFunction` in // the database for when the module restarts. - Err(NoSuchModule) => {} + Err(CallScheduledFunctionError::NoSuchModule(_)) => {} Ok(CallScheduledFunctionResult { reschedule: None }) => { // nothing to do } diff --git a/crates/core/src/host/v8/mod.rs b/crates/core/src/host/v8/mod.rs index b611fcaef..ffd871e2c 100644 --- a/crates/core/src/host/v8/mod.rs +++ b/crates/core/src/host/v8/mod.rs @@ -16,7 +16,7 @@ use crate::client::ClientActorId; use crate::host::host_controller::CallProcedureReturn; use crate::host::instance_env::{ChunkPool, InstanceEnv, TxSlot}; use crate::host::module_host::{ - call_identity_connected, init_database, ClientConnectedError, ViewCommand, ViewCommandResult, + call_identity_connected, init_database, ClientConnectedError, ViewCallError, ViewCommand, ViewCommandResult, }; use crate::host::scheduler::{CallScheduledFunctionResult, ScheduledFunctionParams}; use crate::host::wasm_common::instrumentation::CallTimes; @@ -33,10 +33,10 @@ use crate::subscription::module_subscription_manager::TransactionOffset; use crate::util::jobs::{AllocatedJobCore, CorePinner, LoadBalanceOnDropGuard}; use core::any::type_name; use core::str; -use enum_as_inner::EnumAsInner; use futures::future::LocalBoxFuture; use futures::FutureExt; use itertools::Either; +use parking_lot::RwLock; use spacetimedb_auth::identity::ConnectionAuthCtx; use spacetimedb_client_api_messages::energy::FunctionBudget; use spacetimedb_datastore::locking_tx_datastore::FuncCallType; @@ -46,10 +46,12 @@ use spacetimedb_schema::auto_migrate::MigrationPolicy; use spacetimedb_schema::def::ModuleDef; use spacetimedb_schema::identifier::Identifier; use spacetimedb_table::static_assert_size; +use std::cell::Cell; use std::panic::AssertUnwindSafe; +use std::sync::atomic::{AtomicBool, AtomicU64, Ordering}; use std::sync::{Arc, LazyLock}; use std::time::Instant; -use tokio::sync::oneshot; +use tokio::sync::{oneshot, Mutex as AsyncMutex}; use tracing::Instrument; use v8::script_compiler::{compile_module, Source}; use v8::{ @@ -93,6 +95,53 @@ impl V8Runtime { } static V8_RUNTIME_GLOBAL: LazyLock = LazyLock::new(V8RuntimeInner::init); +static NEXT_JS_INSTANCE_ID: AtomicU64 = AtomicU64::new(1); + +thread_local! { + // Note, `on_module_thread` runs host closures on a single JS module thread. + // Enqueuing more JS module-thread work from one of those closures waits on the + // same worker thread that is already busy running the current closure. + // And this deadlocks. + static ON_JS_MODULE_THREAD: Cell = const { Cell::new(false) }; +} + +struct EnteredJsModuleThread; + +impl EnteredJsModuleThread { + fn new() -> Self { + ON_JS_MODULE_THREAD.with(|entered| { + assert!( + !entered.get(), + "reentrancy into the JS module thread; this would deadlock. \ + Do not enqueue onto this worker from inside `on_module_thread` work." + ); + entered.set(true); + }); + Self + } +} + +impl Drop for EnteredJsModuleThread { + fn drop(&mut self) { + ON_JS_MODULE_THREAD.with(|entered| { + debug_assert!( + entered.get(), + "JS module thread marker should only be cleared after entry" + ); + entered.set(false); + }); + } +} + +pub(crate) fn assert_not_on_js_module_thread(label: &str) { + ON_JS_MODULE_THREAD.with(|entered| { + assert!( + !entered.get(), + "{label} attempted to re-enter the JS module thread from code already \ + running on that thread; this would deadlock" + ); + }); +} /// The actual V8 runtime, with initialization of V8. struct V8RuntimeInner { @@ -290,52 +339,52 @@ impl JsInstanceEnv { /// The actual work happens in a worker thread, /// which the instance communicates with through channels. /// -/// When the instance is dropped, the channels will hang up, -/// which will cause the worker's loop to terminate -/// and cleanup the isolate and friends. +/// This handle is cloneable and shared by callers. Requests are queued FIFO +/// on the worker thread so the next reducer can start immediately after the +/// previous one finishes, without waiting for an outer task to hand the +/// instance back. +/// +/// When the last handle is dropped, the channels will hang up, +/// which will cause the worker's loop to terminate and cleanup the isolate +/// and friends. +#[derive(Clone)] pub struct JsInstance { + /// Stable identifier for the underlying worker generation. + /// + /// All clones of the same handle share the same `id`. The instance lane uses + /// it to tell whether the currently active worker has already been replaced + /// after a trap or disconnect. + id: u64, request_tx: flume::Sender, - reply_rx: flume::Receiver<(JsWorkerReply, bool)>, - trapped: bool, + trapped: Arc, } impl JsInstance { + fn id(&self) -> u64 { + self.id + } + pub fn trapped(&self) -> bool { - self.trapped + self.trapped.load(Ordering::Relaxed) } - /// Send a request to the worker and wait for a reply. - async fn send_recv( - &mut self, - extract: impl FnOnce(JsWorkerReply) -> Result, - request: JsWorkerRequest, - ) -> T { - // Send the request. + async fn send_request( + &self, + request: impl FnOnce(JsReplyTx) -> JsWorkerRequest, + ) -> Result { + let (reply_tx, reply_rx) = oneshot::channel(); self.request_tx - .send_async(request) + .send_async(request(reply_tx)) .await - .expect("worker's `request_rx` should be live as `JsInstance::drop` hasn't happened"); - - // Wait for the response. - let (reply, trapped) = self - .reply_rx - .recv_async() - .await - .expect("worker's `reply_tx` should be live as `JsInstance::drop` hasn't happened"); - - self.trapped = trapped; - - match extract(reply) { - Err(err) => unreachable!("should have received {} but got {err:?}", type_name::()), - Ok(reply) => reply, + .map_err(|_| WorkerDisconnected)?; + let JsWorkerReply { value, trapped } = reply_rx.await.map_err(|_| WorkerDisconnected)?; + if trapped { + self.trapped.store(true, Ordering::Relaxed); } + Ok(value) } - /// Run the given function on the worker thread. - /// - /// This method, unlike the others, does not expect a response on the - /// `reply_rx` channel, since the return value `R` could be of any type. - pub async fn run_on_thread(&mut self, f: F) -> R + pub async fn run_on_thread(&self, f: F) -> R where F: AsyncFnOnce() -> R + Send + 'static, R: Send + 'static, @@ -343,145 +392,137 @@ impl JsInstance { let span = tracing::Span::current(); let (tx, rx) = oneshot::channel(); - let request = JsWorkerRequest::RunFunction(Box::new(move || { - async move { - let result = AssertUnwindSafe(f().instrument(span)).catch_unwind().await; - if let Err(Err(_panic)) = tx.send(result) { - tracing::warn!("uncaught panic on `SingleCoreExecutor`") - } - } - .boxed_local() - })); - self.request_tx - .send_async(request) + .send_async(JsWorkerRequest::RunFunction(Box::new(move || { + async move { + let result = AssertUnwindSafe(f().instrument(span)).catch_unwind().await; + if let Err(Err(_panic)) = tx.send(result) { + tracing::warn!("uncaught panic on `SingleCoreExecutor`") + } + } + .boxed_local() + }))) .await - .expect("worker's `request_rx` should be live as `JsInstance::drop` hasn't happened"); + .unwrap_or_else(|_| panic!("worker should stay live while handling {}", type_name::())); - match rx.await.unwrap() { + match rx + .await + .unwrap_or_else(|_| panic!("worker should stay live while handling {}", type_name::())) + { Ok(r) => r, Err(e) => std::panic::resume_unwind(e), } } pub async fn update_database( - &mut self, + &self, program: Program, old_module_info: Arc, policy: MigrationPolicy, ) -> anyhow::Result { - self.send_recv( - JsWorkerReply::into_update_database, - JsWorkerRequest::UpdateDatabase { - program, - old_module_info, - policy, - }, - ) + self.send_request(|reply_tx| JsWorkerRequest::UpdateDatabase { + reply_tx, + program, + old_module_info, + policy, + }) .await + .unwrap_or_else(|_| panic!("worker should stay live while updating the database")) } - pub async fn call_reducer(&mut self, params: CallReducerParams) -> ReducerCallResult { - self.send_recv( - JsWorkerReply::into_call_reducer, - JsWorkerRequest::CallReducer { params }, - ) - .await - } - - pub async fn clear_all_clients(&mut self) -> anyhow::Result<()> { - self.send_recv(JsWorkerReply::into_clear_all_clients, JsWorkerRequest::ClearAllClients) + pub async fn call_reducer(&self, params: CallReducerParams) -> ReducerCallResult { + self.send_request(|reply_tx| JsWorkerRequest::CallReducer { reply_tx, params }) .await + .unwrap_or_else(|_| panic!("worker should stay live while calling a reducer")) + } + + pub async fn clear_all_clients(&self) -> anyhow::Result<()> { + self.send_request(JsWorkerRequest::ClearAllClients) + .await + .unwrap_or_else(|_| panic!("worker should stay live while clearing clients")) } pub async fn call_identity_connected( - &mut self, + &self, caller_auth: ConnectionAuthCtx, caller_connection_id: ConnectionId, ) -> Result<(), ClientConnectedError> { - self.send_recv( - JsWorkerReply::into_call_identity_connected, - JsWorkerRequest::CallIdentityConnected(caller_auth, caller_connection_id), - ) + self.send_request(|reply_tx| JsWorkerRequest::CallIdentityConnected { + reply_tx, + caller_auth, + caller_connection_id, + }) .await + .unwrap_or_else(|_| panic!("worker should stay live while running client_connected")) } pub async fn call_identity_disconnected( - &mut self, + &self, caller_identity: Identity, caller_connection_id: ConnectionId, ) -> Result<(), ReducerCallError> { - self.send_recv( - JsWorkerReply::into_call_identity_disconnected, - JsWorkerRequest::CallIdentityDisconnected(caller_identity, caller_connection_id), - ) + self.send_request(|reply_tx| JsWorkerRequest::CallIdentityDisconnected { + reply_tx, + caller_identity, + caller_connection_id, + }) .await + .unwrap_or_else(|_| panic!("worker should stay live while running client_disconnected")) } - pub async fn disconnect_client(&mut self, client_id: ClientActorId) -> Result<(), ReducerCallError> { - self.send_recv( - JsWorkerReply::into_disconnect_client, - JsWorkerRequest::DisconnectClient(client_id), - ) - .await - } - - pub async fn init_database(&mut self, program: Program) -> anyhow::Result> { - *self - .send_recv( - JsWorkerReply::into_init_database, - JsWorkerRequest::InitDatabase(program), - ) + pub async fn disconnect_client(&self, client_id: ClientActorId) -> Result<(), ReducerCallError> { + self.send_request(|reply_tx| JsWorkerRequest::DisconnectClient { reply_tx, client_id }) .await + .unwrap_or_else(|_| panic!("worker should stay live while disconnecting a client")) } - pub async fn call_procedure(&mut self, params: CallProcedureParams) -> CallProcedureReturn { - *self - .send_recv( - JsWorkerReply::into_call_procedure, - JsWorkerRequest::CallProcedure { params }, - ) + pub async fn init_database(&self, program: Program) -> anyhow::Result> { + self.send_request(|reply_tx| JsWorkerRequest::InitDatabase { reply_tx, program }) .await + .unwrap_or_else(|_| panic!("worker should stay live while initializing the database")) } - pub async fn call_view(&mut self, cmd: ViewCommand) -> ViewCommandResult { - *self - .send_recv(JsWorkerReply::into_call_view, JsWorkerRequest::CallView { cmd }) + pub async fn call_procedure(&self, params: CallProcedureParams) -> CallProcedureReturn { + self.send_request(|reply_tx| JsWorkerRequest::CallProcedure { reply_tx, params }) .await + .unwrap_or_else(|_| panic!("worker should stay live while calling a procedure")) + } + + pub async fn call_view(&self, cmd: ViewCommand) -> ViewCommandResult { + self.send_request(|reply_tx| JsWorkerRequest::CallView { reply_tx, cmd }) + .await + .unwrap_or_else(|_| panic!("worker should stay live while calling a view")) } pub(in crate::host) async fn call_scheduled_function( - &mut self, + &self, params: ScheduledFunctionParams, ) -> CallScheduledFunctionResult { - self.send_recv( - JsWorkerReply::into_call_scheduled_function, - JsWorkerRequest::CallScheduledFunction(params), - ) - .await + self.send_request(|reply_tx| JsWorkerRequest::CallScheduledFunction { reply_tx, params }) + .await + .unwrap_or_else(|_| panic!("worker should stay live while calling a scheduled function")) } } -/// A reply from the worker in [`spawn_instance_worker`]. -#[derive(EnumAsInner, Debug)] -enum JsWorkerReply { - UpdateDatabase(anyhow::Result), - CallReducer(ReducerCallResult), - CallView(Box), - CallProcedure(Box), - ClearAllClients(anyhow::Result<()>), - CallIdentityConnected(Result<(), ClientConnectedError>), - CallIdentityDisconnected(Result<(), ReducerCallError>), - DisconnectClient(Result<(), ReducerCallError>), - InitDatabase(Box>>), - CallScheduledFunction(CallScheduledFunctionResult), +#[derive(Clone, Copy, Debug)] +struct WorkerDisconnected; + +fn instance_lane_worker_error(label: &'static str) -> String { + format!("instance lane worker exited while handling {label}") } -static_assert_size!(JsWorkerReply, 48); +struct JsWorkerReply { + value: T, + trapped: bool, +} -/// A request for the worker in [`spawn_instance_worker`]. -// We care about optimizing for `CallReducer` as it happens frequently, -// so we don't want to box anything in it. +type JsReplyTx = oneshot::Sender>; + +/// Requests sent to the dedicated JS worker thread. +/// +/// Most variants carry a `reply_tx` because the worker thread owns the isolate, +/// executes the request there, and then has to send both the typed result and +/// the worker's trapped-bit back to the async caller. enum JsWorkerRequest { /// See [`JsInstance::run_on_thread`]. /// @@ -489,34 +530,327 @@ enum JsWorkerRequest { RunFunction(Box LocalBoxFuture<'static, ()> + Send>), /// See [`JsInstance::update_database`]. UpdateDatabase { + reply_tx: JsReplyTx>, program: Program, old_module_info: Arc, policy: MigrationPolicy, }, /// See [`JsInstance::call_reducer`]. - CallReducer { params: CallReducerParams }, + CallReducer { + reply_tx: JsReplyTx, + params: CallReducerParams, + }, /// See [`JsInstance::call_view`]. - CallView { cmd: ViewCommand }, + CallView { + reply_tx: JsReplyTx, + cmd: ViewCommand, + }, /// See [`JsInstance::call_procedure`]. - CallProcedure { params: CallProcedureParams }, + CallProcedure { + reply_tx: JsReplyTx, + params: CallProcedureParams, + }, /// See [`JsInstance::clear_all_clients`]. - ClearAllClients, + ClearAllClients(JsReplyTx>), /// See [`JsInstance::call_identity_connected`]. - CallIdentityConnected(ConnectionAuthCtx, ConnectionId), + CallIdentityConnected { + reply_tx: JsReplyTx>, + caller_auth: ConnectionAuthCtx, + caller_connection_id: ConnectionId, + }, /// See [`JsInstance::call_identity_disconnected`]. - CallIdentityDisconnected(Identity, ConnectionId), + CallIdentityDisconnected { + reply_tx: JsReplyTx>, + caller_identity: Identity, + caller_connection_id: ConnectionId, + }, /// See [`JsInstance::disconnect_client`]. - DisconnectClient(ClientActorId), + DisconnectClient { + reply_tx: JsReplyTx>, + client_id: ClientActorId, + }, /// See [`JsInstance::init_database`]. - InitDatabase(Program), + InitDatabase { + reply_tx: JsReplyTx>>, + program: Program, + }, /// See [`JsInstance::call_scheduled_function`]. - CallScheduledFunction(ScheduledFunctionParams), + CallScheduledFunction { + reply_tx: JsReplyTx, + params: ScheduledFunctionParams, + }, } -// These two should be the same size (once core pinning PR lands). -static_assert_size!(JsWorkerRequest, 192); static_assert_size!(CallReducerParams, 192); +fn send_worker_reply(ctx: &str, reply_tx: JsReplyTx, value: T, trapped: bool) { + if reply_tx.send(JsWorkerReply { value, trapped }).is_err() { + log::error!("should have receiver for `{ctx}` response"); + } +} + +struct JsInstanceLaneState { + // Instance-lane calls stay on one active worker for locality. The hot path clones + // this handle and feeds work straight into the worker-owned FIFO; trap + // recovery very rarely swaps it out for a fresh instance. + active: RwLock, + + // Replacement must be serialized because multiple callers can all observe + // the same trapped worker and attempt recovery at once. + // + // This must be an async mutex rather than a blocking mutex because recovery + // may need to call `create_instance().await`. + // + // This stays safe as long as these invariants hold: + // - `replace_lock` is only for trap/disconnect recovery, never the hot path. + // - no `parking_lot` guard is held across the `.await` in replacement. + // - `create_instance()` must not call back into `JsInstanceLane` or try to + // take `replace_lock`. + replace_lock: AsyncMutex<()>, +} + +/// A single serialized execution lane for JS module work. +/// +/// Callers share one active [`JsInstance`] so hot requests stay on the same +/// worker thread for locality. The lane only steps in on the rare path, where +/// a trap or disconnect forces that active worker to be replaced. +#[derive(Clone)] +pub struct JsInstanceLane { + module: JsModule, + state: Arc, +} + +impl JsInstanceLane { + pub fn new(module: JsModule, init_inst: JsInstance) -> Self { + Self { + module, + state: Arc::new(JsInstanceLaneState { + active: RwLock::new(init_inst), + replace_lock: AsyncMutex::new(()), + }), + } + } + + fn active_instance(&self) -> JsInstance { + self.state.active.read().clone() + } + + async fn after_successful_call(&self, active: &JsInstance) { + if active.trapped() { + self.replace_active_if_current(active).await; + } + } + + async fn replace_active_if_current(&self, trapped: &JsInstance) { + // `replace_lock` intentionally serializes the rare recovery path. This + // prevents a trap observed by many callers from spawning many replacement + // workers and racing to install them. + let _replace_guard = self.state.replace_lock.lock().await; + + // The same trapped instance can be observed by multiple callers at once. + // We only want the first one to do the swap; everybody else should notice + // that the active handle already changed and get out of the way. + if self.state.active.read().id() != trapped.id() { + return; + } + + log::warn!("instance lane worker trapped; creating a fresh instance-lane worker"); + + // Keep the awaited instance creation outside of any `parking_lot` guard. + // The only lock held across this await is `replace_lock`, which is why it + // has to be async. + let next = self.module.create_instance().await; + *self.state.active.write() = next; + } + + /// Run an instance-lane operation exactly once. + /// + /// If the worker disappears before replying, we replace it for future + /// requests but surface the disconnect to the caller instead of retrying. + /// This keeps instance-lane semantics closer to the old pooled-instance + /// model now that the worker queue is a rendezvous channel. + async fn run_once( + &self, + label: &'static str, + work: impl AsyncFnOnce(JsInstance) -> Result, + ) -> Result { + assert_not_on_js_module_thread(label); + + let active = self.active_instance(); + let result = work(active.clone()).await; + match result { + Ok(value) => { + self.after_successful_call(&active).await; + Ok(value) + } + Err(err) => { + self.replace_active_if_current(&active).await; + log::error!("instance-lane operation {label} lost its worker before replying"); + Err(err) + } + } + } + + /// Run an arbitrary closure on the instance-lane worker thread without replay. + /// + /// This is non-replayable because the closure is opaque host code, not a + /// cloneable request payload, and it may have already produced host-side + /// effects before a worker disconnect is observed. + pub async fn run_on_thread(&self, f: F) -> anyhow::Result + where + F: AsyncFnOnce() -> R + Send + 'static, + R: Send + 'static, + { + let span = tracing::Span::current(); + self.run_once("run_on_thread", async move |inst| { + let (tx, rx) = oneshot::channel(); + + inst.request_tx + .send_async(JsWorkerRequest::RunFunction(Box::new(move || { + async move { + let _on_js_module_thread = EnteredJsModuleThread::new(); + let result = AssertUnwindSafe(f().instrument(span)).catch_unwind().await; + if let Err(Err(_panic)) = tx.send(result) { + tracing::warn!("uncaught panic on `SingleCoreExecutor`") + } + } + .boxed_local() + }))) + .await + .map_err(|_| WorkerDisconnected)?; + + Ok(match rx.await.map_err(|_| WorkerDisconnected)? { + Ok(r) => r, + Err(e) => std::panic::resume_unwind(e), + }) + }) + .await + .map_err(|_| anyhow::anyhow!("instance lane worker exited while running a non-replayable module-thread task")) + } + + /// Run a database update on the instance lane exactly once. + /// + /// If the worker disappears before replying, we replace it for future + /// requests and surface an internal host error to the caller. + pub async fn update_database( + &self, + program: Program, + old_module_info: Arc, + policy: MigrationPolicy, + ) -> anyhow::Result { + self.run_once("update_database", |inst: JsInstance| async move { + inst.send_request(|reply_tx| JsWorkerRequest::UpdateDatabase { + reply_tx, + program, + old_module_info, + policy, + }) + .await + }) + .await + .map_err(|_| anyhow::anyhow!(instance_lane_worker_error("update_database")))? + } + + /// Run a reducer on the instance lane exactly once. + /// + /// A real reducer trap still returns the reducer's own outcome before the + /// worker is replaced. If the worker disappears before any reply, we surface + /// that as `ReducerCallError::WorkerError`. + pub async fn call_reducer(&self, params: CallReducerParams) -> Result { + self.run_once("call_reducer", |inst: JsInstance| async move { + inst.send_request(|reply_tx| JsWorkerRequest::CallReducer { reply_tx, params }) + .await + }) + .await + .map_err(|_| ReducerCallError::WorkerError(instance_lane_worker_error("call_reducer"))) + } + + /// Clear all instance-lane client state exactly once. + pub async fn clear_all_clients(&self) -> anyhow::Result<()> { + self.run_once("clear_all_clients", |inst: JsInstance| async move { + inst.send_request(JsWorkerRequest::ClearAllClients).await + }) + .await + .map_err(|_| anyhow::anyhow!(instance_lane_worker_error("clear_all_clients")))? + } + + /// Run the `client_connected` lifecycle reducer exactly once. + /// + /// If the worker disappears before replying, we replace it for future + /// requests and reject the connection with `ReducerCallError::WorkerError`. + pub async fn call_identity_connected( + &self, + caller_auth: ConnectionAuthCtx, + caller_connection_id: ConnectionId, + ) -> Result<(), ClientConnectedError> { + self.run_once("call_identity_connected", |inst: JsInstance| async move { + inst.send_request(|reply_tx| JsWorkerRequest::CallIdentityConnected { + reply_tx, + caller_auth, + caller_connection_id, + }) + .await + }) + .await + .map_err(|_| { + ClientConnectedError::from(ReducerCallError::WorkerError(instance_lane_worker_error( + "call_identity_connected", + ))) + })? + } + + /// Run the `client_disconnected` lifecycle reducer exactly once. + pub async fn call_identity_disconnected( + &self, + caller_identity: Identity, + caller_connection_id: ConnectionId, + ) -> Result<(), ReducerCallError> { + self.run_once("call_identity_disconnected", |inst: JsInstance| async move { + inst.send_request(|reply_tx| JsWorkerRequest::CallIdentityDisconnected { + reply_tx, + caller_identity, + caller_connection_id, + }) + .await + }) + .await + .map_err(|_| ReducerCallError::WorkerError(instance_lane_worker_error("call_identity_disconnected")))? + } + + /// Run disconnect cleanup on the instance lane exactly once. + pub async fn disconnect_client(&self, client_id: ClientActorId) -> Result<(), ReducerCallError> { + self.run_once("disconnect_client", |inst: JsInstance| async move { + inst.send_request(|reply_tx| JsWorkerRequest::DisconnectClient { reply_tx, client_id }) + .await + }) + .await + .map_err(|_| ReducerCallError::WorkerError(instance_lane_worker_error("disconnect_client")))? + } + + /// Run reducer-style database initialization exactly once. + pub async fn init_database(&self, program: Program) -> anyhow::Result> { + self.run_once("init_database", |inst: JsInstance| async move { + inst.send_request(|reply_tx| JsWorkerRequest::InitDatabase { reply_tx, program }) + .await + }) + .await + .map_err(|_| anyhow::anyhow!(instance_lane_worker_error("init_database")))? + } + + /// Run a view/subscription command on the instance lane exactly once. + /// + /// If the worker disappears before replying, we replace it for future + /// requests and surface a `ViewCallError::InternalError`. + pub async fn call_view(&self, cmd: ViewCommand) -> Result { + self.run_once("call_view", |inst: JsInstance| async move { + inst.send_request(|reply_tx| JsWorkerRequest::CallView { reply_tx, cmd }) + .await + }) + .await + .map_err(|_| ViewCallError::InternalError(instance_lane_worker_error("call_view"))) + } +} + /// Performs some of the startup work of [`spawn_instance_worker`]. /// /// NOTE(centril): in its own function due to lack of `try` blocks. @@ -575,13 +909,11 @@ async fn spawn_instance_worker( load_balance_guard: Arc, mut core_pinner: CorePinner, ) -> anyhow::Result<(ModuleCommon, JsInstance)> { - // Spawn channels for bidirectional communication between worker and instance. - // The use-case is SPSC and all channels are rendezvous channels - // where each `.send` blocks until it's received. - // The Instance --Request-> Worker channel: + // Spawn a rendezvous queue for requests to the worker. + // Multiple callers can wait to hand work to the worker, but with + // `bounded(0)` there is no buffered backlog inside the channel itself. + // The worker still processes requests strictly one at a time. let (request_tx, request_rx) = flume::bounded(0); - // The Worker --Reply-> Instance channel: - let (reply_tx, reply_rx) = flume::bounded(0); // This one-shot channel is used for initial startup error handling within the thread. let (result_tx, result_rx) = oneshot::channel(); @@ -659,65 +991,63 @@ async fn spawn_instance_worker( // Process requests to the worker. // - // The loop is terminated when a `JsInstance` is dropped. + // The loop is terminated when the last `JsInstance` handle is dropped. // This will cause channels, scopes, and the isolate to be cleaned up. - let reply = |ctx: &str, reply: JsWorkerReply, trapped| { - if let Err(e) = reply_tx.send((reply, trapped)) { - // This should never happen as `JsInstance::$function` immediately - // does `.recv` on the other end of the channel, though sometimes - // it gets cancelled. - log::error!("should have receiver for `{ctx}` response, {e}"); - } - }; for request in request_rx.iter() { let mut call_reducer = |tx, params| instance_common.call_reducer_with_tx(tx, params, &mut inst); + let mut should_exit = false; core_pinner.pin_if_changed(); - use JsWorkerReply::*; match request { JsWorkerRequest::RunFunction(f) => rt.block_on(f()), JsWorkerRequest::UpdateDatabase { + reply_tx, program, old_module_info, policy, } => { - // Update the database and reply to `JsInstance::update_database`. let res = instance_common.update_database(program, old_module_info, policy, &mut inst); - reply("update_database", UpdateDatabase(res), false); + send_worker_reply("update_database", reply_tx, res, false); } - JsWorkerRequest::CallReducer { params } => { - // Call the reducer. - // If execution trapped, we don't end the loop here, - // but rather let this happen by `return_instance` using `JsInstance::trapped` - // which will cause `JsInstance` to be dropped, - // which in turn results in the loop being terminated. + JsWorkerRequest::CallReducer { reply_tx, params } => { let (res, trapped) = call_reducer(None, params); - reply("call_reducer", CallReducer(res), trapped); + send_worker_reply("call_reducer", reply_tx, res, trapped); + should_exit = trapped; } - JsWorkerRequest::CallView { cmd } => { + JsWorkerRequest::CallView { reply_tx, cmd } => { let (res, trapped) = instance_common.handle_cmd(cmd, &mut inst); - reply("call_view", JsWorkerReply::CallView(res.into()), trapped); + send_worker_reply("call_view", reply_tx, res, trapped); + should_exit = trapped; } - JsWorkerRequest::CallProcedure { params } => { + JsWorkerRequest::CallProcedure { reply_tx, params } => { let (res, trapped) = instance_common .call_procedure(params, &mut inst) .now_or_never() .expect("our call_procedure implementation is not actually async"); - - reply("call_procedure", JsWorkerReply::CallProcedure(res.into()), trapped); + send_worker_reply("call_procedure", reply_tx, res, trapped); + should_exit = trapped; } - JsWorkerRequest::ClearAllClients => { + JsWorkerRequest::ClearAllClients(reply_tx) => { let res = instance_common.clear_all_clients(); - reply("clear_all_clients", ClearAllClients(res), false); + send_worker_reply("clear_all_clients", reply_tx, res, false); } - JsWorkerRequest::CallIdentityConnected(caller_auth, caller_connection_id) => { + JsWorkerRequest::CallIdentityConnected { + reply_tx, + caller_auth, + caller_connection_id, + } => { let mut trapped = false; let res = call_identity_connected(caller_auth, caller_connection_id, info, call_reducer, &mut trapped); - reply("call_identity_connected", CallIdentityConnected(res), trapped); + send_worker_reply("call_identity_connected", reply_tx, res, trapped); + should_exit = trapped; } - JsWorkerRequest::CallIdentityDisconnected(caller_identity, caller_connection_id) => { + JsWorkerRequest::CallIdentityDisconnected { + reply_tx, + caller_identity, + caller_connection_id, + } => { let mut trapped = false; let res = ModuleHost::call_identity_disconnected_inner( caller_identity, @@ -726,26 +1056,38 @@ async fn spawn_instance_worker( call_reducer, &mut trapped, ); - reply("call_identity_disconnected", CallIdentityDisconnected(res), trapped); + send_worker_reply("call_identity_disconnected", reply_tx, res, trapped); + should_exit = trapped; } - JsWorkerRequest::DisconnectClient(client_id) => { + JsWorkerRequest::DisconnectClient { reply_tx, client_id } => { let mut trapped = false; let res = ModuleHost::disconnect_client_inner(client_id, info, call_reducer, &mut trapped); - reply("disconnect_client", DisconnectClient(res), trapped); + send_worker_reply("disconnect_client", reply_tx, res, trapped); + should_exit = trapped; } - JsWorkerRequest::InitDatabase(program) => { + JsWorkerRequest::InitDatabase { reply_tx, program } => { let (res, trapped): (Result, anyhow::Error>, bool) = init_database(replica_ctx, &module_common.info().module_def, program, call_reducer); - reply("init_database", InitDatabase(Box::new(res)), trapped); + send_worker_reply("init_database", reply_tx, res, trapped); + should_exit = trapped; } - JsWorkerRequest::CallScheduledFunction(params) => { + JsWorkerRequest::CallScheduledFunction { reply_tx, params } => { let (res, trapped) = instance_common .call_scheduled_function(params, &mut inst) .now_or_never() .expect("our call_procedure implementation is not actually async"); - reply("call_scheduled_function", CallScheduledFunction(res), trapped); + send_worker_reply("call_scheduled_function", reply_tx, res, trapped); + should_exit = trapped; } } + + // Once a JS instance traps, we must not let later queued work execute + // on that poisoned isolate. We reply to the trapping request first so + // the caller can observe the actual reducer/procedure error, and then + // shut the worker down so later callers retry on a fresh instance. + if should_exit { + break; + } } }); @@ -753,9 +1095,9 @@ async fn spawn_instance_worker( let res: Result = result_rx.await.expect("should have a sender"); res.map(|opt_mc| { let inst = JsInstance { + id: NEXT_JS_INSTANCE_ID.fetch_add(1, Ordering::Relaxed), request_tx, - reply_rx, - trapped: false, + trapped: Arc::new(AtomicBool::new(false)), }; (opt_mc, inst) })