diff --git a/crates/core/src/host/module_host.rs b/crates/core/src/host/module_host.rs index 25e3b4560..5f2b1f972 100644 --- a/crates/core/src/host/module_host.rs +++ b/crates/core/src/host/module_host.rs @@ -12,7 +12,7 @@ use crate::estimation::{check_row_limit, estimate_rows_scanned}; use crate::hash::Hash; use crate::host::host_controller::CallProcedureReturn; use crate::host::scheduler::{CallScheduledFunctionError, CallScheduledFunctionResult, ScheduledFunctionParams}; -use crate::host::v8::JsInstance; +use crate::host::v8::{JsMainInstance, JsProcedureInstance}; pub use crate::host::wasm_common::module_host_actor::{InstanceCommon, WasmInstance}; use crate::host::wasmtime::ModuleInstance; use crate::host::{InvalidFunctionArguments, InvalidViewArguments}; @@ -345,7 +345,7 @@ pub enum ModuleWithInstance { }, Js { module: super::v8::JsModule, - init_inst: super::v8::JsInstance, + init_inst: super::v8::JsMainInstance, }, } @@ -361,7 +361,7 @@ struct WasmtimeModuleHost { struct V8ModuleHost { module: super::v8::JsModule, - main_instance: SharedJsInstanceManager, + main_instance: SharedJsMainInstanceManager, procedure_instances: ModuleInstanceManager, } @@ -407,7 +407,7 @@ impl GenericModule for super::wasmtime::Module { } impl GenericModule for super::v8::JsModule { - type Instance = super::v8::JsInstance; + type Instance = super::v8::JsProcedureInstance; async fn create_instance(&self) -> Self::Instance { self.create_instance().await } @@ -416,14 +416,10 @@ impl GenericModule for super::v8::JsModule { } } -impl GenericModuleInstance for super::v8::JsInstance { +impl GenericModuleInstance for super::v8::JsProcedureInstance { fn trapped(&self) -> bool { self.trapped() } - - fn needs_replacement(&self) -> bool { - self.needs_replacement() - } } /// Creates the table for `table_def` in `stdb`. @@ -763,9 +759,9 @@ impl CallProcedureParams { /// /// Capable of managing and allocating multiple instances of the same module, /// but this functionality is currently unused, as only one reducer runs at a time. -/// When we introduce procedures, it will be necessary to have multiple instances, -/// as each procedure invocation will have its own sandboxed instance, -/// and multiple procedures can run concurrently with up to one reducer. +/// Procedures need multiple instances, as each procedure invocation has its own +/// sandboxed instance and multiple procedures can run concurrently with up to +/// one reducer. struct ModuleInstanceManager { instances: Mutex>, module: M, @@ -778,8 +774,8 @@ struct ModuleInstanceManager { /// clone of the active instance handle immediately and enqueue work on its /// sender. Replacement is serialized only after that active instance traps or /// otherwise becomes unusable. -struct SharedJsInstanceManager { - active: RwLock, +struct SharedJsMainInstanceManager { + active: RwLock, module: super::v8::JsModule, metrics: InstanceManagerMetrics, replace_lock: AsyncMutex<()>, @@ -960,8 +956,8 @@ impl ModuleInstanceManager { } } -impl SharedJsInstanceManager { - fn new(module: super::v8::JsModule, init_inst: JsInstance, metrics: InstanceManagerMetrics) -> Self { +impl SharedJsMainInstanceManager { + fn new(module: super::v8::JsModule, init_inst: JsMainInstance, metrics: InstanceManagerMetrics) -> Self { metrics.track_initial_instance(); Self { active: RwLock::new(init_inst), @@ -971,7 +967,7 @@ impl SharedJsInstanceManager { } } - async fn with_instance(&self, f: impl AsyncFnOnce(JsInstance) -> R) -> R { + async fn with_instance(&self, f: impl AsyncFnOnce(JsMainInstance) -> R) -> R { let inst = self.get_instance().await; let observed = inst.clone(); let res = f(inst).await; @@ -979,7 +975,7 @@ impl SharedJsInstanceManager { res } - async fn get_instance(&self) -> JsInstance { + async fn get_instance(&self) -> JsMainInstance { let inst = self.active.read().clone(); if !inst.needs_replacement() { return inst; @@ -989,7 +985,7 @@ impl SharedJsInstanceManager { self.active.read().clone() } - async fn replace_if_needed(&self, observed: &JsInstance) { + async fn replace_if_needed(&self, observed: &JsMainInstance) { if !observed.needs_replacement() { return; } @@ -1222,7 +1218,7 @@ impl ModuleHost { info = module.info(); let metrics = InstanceManagerMetrics::new(module.host_type(), database_identity); let host_module = module.clone(); - let main_instance = SharedJsInstanceManager::new(module.clone(), init_inst, metrics.clone()); + let main_instance = SharedJsMainInstanceManager::new(module.clone(), init_inst, metrics.clone()); let procedure_instances = ModuleInstanceManager::new_with_metrics(module, None, metrics); Arc::new(ModuleHostInner::Js(Box::new(V8ModuleHost { module: host_module, @@ -1345,7 +1341,7 @@ impl ModuleHost { arg: A, timer: impl FnOnce(&str) -> Guard, work_wasm: impl AsyncFnOnce(Guard, &SingleCoreExecutor, Box, A) -> (R, Box), - work_js: impl AsyncFnOnce(Guard, &JsInstance, A) -> R, + work_js: impl AsyncFnOnce(Guard, &JsMainInstance, A) -> R, ) -> Result { self.guard_closed()?; let timer_guard = timer(label); @@ -1388,7 +1384,7 @@ impl ModuleHost { label: &str, arg: A, wasm: impl AsyncFnOnce(A, &mut ModuleInstance) -> R + Send + 'static, - js: impl AsyncFnOnce(A, &JsInstance) -> R, + js: impl AsyncFnOnce(A, &JsMainInstance) -> R, ) -> Result where R: Send + 'static, @@ -1431,7 +1427,7 @@ impl ModuleHost { label: &str, arg: A, wasm: impl AsyncFnOnce(A, &mut ModuleInstance) -> R + Send + 'static, - js: impl AsyncFnOnce(A, &JsInstance) -> R, + js: impl AsyncFnOnce(A, &JsProcedureInstance) -> R, ) -> Result where R: Send + 'static, @@ -1471,7 +1467,7 @@ impl ModuleHost { } async fn call_view_command(&self, label: &str, cmd: ViewCommand) -> Result { - self.call_pooled( + self.call( label, cmd, async |cmd, inst| Ok::<_, ViewCallError>(inst.call_view(cmd)), diff --git a/crates/core/src/host/v8/mod.rs b/crates/core/src/host/v8/mod.rs index 972116658..e07c66dac 100644 --- a/crates/core/src/host/v8/mod.rs +++ b/crates/core/src/host/v8/mod.rs @@ -114,7 +114,7 @@ impl V8Runtime { static V8_RUNTIME_GLOBAL: LazyLock = LazyLock::new(V8RuntimeInner::init); const REDUCER_ARGS_BUFFER_SIZE: usize = 4_096; const JS_MAIN_INSTANCE_QUEUE_CAPACITY: usize = 1024; -const JS_POOLED_INSTANCE_QUEUE_CAPACITY: usize = 1; +const JS_PROCEDURE_INSTANCE_QUEUE_CAPACITY: usize = 1; pub(crate) const V8_WORKER_KIND_MAIN: &str = "main"; thread_local! { @@ -156,7 +156,7 @@ impl Drop for EnteredJsModuleThread { #[derive(Copy, Clone)] enum JsWorkerKind { Main, - Pooled, + Procedure, } impl JsWorkerKind { @@ -167,7 +167,7 @@ impl JsWorkerKind { const fn request_queue_capacity(self) -> usize { match self { Self::Main => JS_MAIN_INSTANCE_QUEUE_CAPACITY, - Self::Pooled => JS_POOLED_INSTANCE_QUEUE_CAPACITY, + Self::Procedure => JS_PROCEDURE_INSTANCE_QUEUE_CAPACITY, } } } @@ -238,13 +238,12 @@ impl V8RuntimeInner { let mcc = Either::Right(mcc); let load_balance_guard = Arc::new(core.guard); let core_pinner = core.pinner; - let (common, init_inst) = spawn_instance_worker( + let (common, init_inst) = spawn_main_instance_worker( program.clone(), mcc, load_balance_guard.clone(), core_pinner.clone(), heap_policy, - JsWorkerKind::Main, ) .await?; let module = JsModule { @@ -281,7 +280,7 @@ impl JsModule { self.common.info().clone() } - async fn create_instance_with_kind(&self, worker_kind: JsWorkerKind) -> JsInstance { + async fn create_procedure_instance(&self) -> JsProcedureInstance { let program = self.program.clone(); let common = self.common.clone(); let load_balance_guard = self.load_balance_guard.clone(); @@ -289,25 +288,39 @@ impl JsModule { let heap_policy = self.heap_policy; // This has to be done in a blocking context because of `blocking_recv`. - let (_, instance) = spawn_instance_worker( + let (_, instance) = spawn_procedure_instance_worker( program, Either::Left(common), load_balance_guard, core_pinner, heap_policy, - worker_kind, ) .await - .expect("`spawn_instance_worker` should succeed when passed `ModuleCommon`"); + .expect("`spawn_procedure_instance_worker` should succeed when passed `ModuleCommon`"); instance } - pub async fn create_instance(&self) -> JsInstance { - self.create_instance_with_kind(JsWorkerKind::Pooled).await + pub async fn create_instance(&self) -> JsProcedureInstance { + self.create_procedure_instance().await } - pub(in crate::host) async fn create_main_instance(&self) -> JsInstance { - self.create_instance_with_kind(JsWorkerKind::Main).await + pub(in crate::host) async fn create_main_instance(&self) -> JsMainInstance { + let program = self.program.clone(); + let common = self.common.clone(); + let load_balance_guard = self.load_balance_guard.clone(); + let core_pinner = self.core_pinner.clone(); + let heap_policy = self.heap_policy; + + let (_, instance) = spawn_main_instance_worker( + program, + Either::Left(common), + load_balance_guard, + core_pinner, + heap_policy, + ) + .await + .expect("`spawn_main_instance_worker` should succeed when passed `ModuleCommon`"); + instance } } @@ -321,7 +334,7 @@ fn env_on_isolate_unwrap(isolate: &mut Isolate) -> &mut JsInstanceEnv { env_on_isolate(isolate).expect("there should be a `JsInstanceEnv`") } -/// The environment of a [`JsInstance`]. +/// The environment bound to a JS worker isolate. struct JsInstanceEnv { instance_env: InstanceEnv, module_def: Option>, @@ -425,7 +438,7 @@ impl JsInstanceEnv { } } -/// An instance for a [`JsModule`]. +/// The main instance for a [`JsModule`]. /// /// The actual work happens in a worker thread, /// which the instance communicates with through channels. @@ -439,12 +452,21 @@ impl JsInstanceEnv { /// which will cause the worker's loop to terminate and cleanup the isolate /// and friends. #[derive(Clone)] -pub struct JsInstance { - tx: mpsc::Sender, +pub struct JsMainInstance { + tx: mpsc::Sender, trapped: Arc, } -impl JsInstance { +/// A procedure instance for a [`JsModule`]. +/// +/// Procedure instances are checked out exclusively from the procedure pool and +/// only execute procedure-style requests. +pub struct JsProcedureInstance { + tx: mpsc::Sender, + trapped: Arc, +} + +impl JsMainInstance { pub fn trapped(&self) -> bool { self.trapped.load(Ordering::Relaxed) } @@ -455,17 +477,9 @@ impl JsInstance { async fn send_request( &self, - request: impl FnOnce(JsReplyTx) -> JsWorkerRequest, + request: impl FnOnce(JsReplyTx) -> JsMainWorkerRequest, ) -> Result { - let (reply_tx, reply_rx) = oneshot::channel(); - self.tx.send(request(reply_tx)).await.map_err(|_| { - self.trapped.store(true, Ordering::Relaxed); - JsWorkerDisconnected - })?; - reply_rx.await.map_err(|_| { - self.trapped.store(true, Ordering::Relaxed); - JsWorkerDisconnected - }) + send_js_request(&self.tx, &self.trapped, request).await } pub async fn run_on_thread(&self, f: F) -> R @@ -478,7 +492,7 @@ impl JsInstance { let (tx, rx) = oneshot::channel(); self.tx - .send(JsWorkerRequest::RunFunction(Box::new(move || { + .send(JsMainWorkerRequest::RunFunction(Box::new(move || { async move { let _on_js_module_thread = EnteredJsModuleThread::new(); let result = AssertUnwindSafe(f().instrument(span)).catch_unwind().await; @@ -509,7 +523,7 @@ impl JsInstance { old_module_info: Arc, policy: MigrationPolicy, ) -> anyhow::Result { - self.send_request(|reply_tx| JsWorkerRequest::UpdateDatabase { + self.send_request(|reply_tx| JsMainWorkerRequest::UpdateDatabase { reply_tx, program, old_module_info, @@ -520,13 +534,13 @@ impl JsInstance { } pub async fn call_reducer(&self, params: CallReducerParams) -> Result { - self.send_request(|reply_tx| JsWorkerRequest::CallReducer { reply_tx, params }) + self.send_request(|reply_tx| JsMainWorkerRequest::CallReducer { reply_tx, params }) .await .map_err(|_| ReducerCallError::WorkerError(js_worker_disconnected_error("call_reducer"))) } pub async fn clear_all_clients(&self) -> anyhow::Result<()> { - self.send_request(JsWorkerRequest::ClearAllClients) + self.send_request(JsMainWorkerRequest::ClearAllClients) .await .map_err(|_| anyhow::anyhow!(js_worker_disconnected_error("clear_all_clients")))? } @@ -536,7 +550,7 @@ impl JsInstance { caller_auth: ConnectionAuthCtx, caller_connection_id: ConnectionId, ) -> Result<(), ClientConnectedError> { - self.send_request(|reply_tx| JsWorkerRequest::CallIdentityConnected { + self.send_request(|reply_tx| JsMainWorkerRequest::CallIdentityConnected { reply_tx, caller_auth, caller_connection_id, @@ -554,7 +568,7 @@ impl JsInstance { caller_identity: Identity, caller_connection_id: ConnectionId, ) -> Result<(), ReducerCallError> { - self.send_request(|reply_tx| JsWorkerRequest::CallIdentityDisconnected { + self.send_request(|reply_tx| JsMainWorkerRequest::CallIdentityDisconnected { reply_tx, caller_identity, caller_connection_id, @@ -564,19 +578,38 @@ impl JsInstance { } pub async fn disconnect_client(&self, client_id: ClientActorId) -> Result<(), ReducerCallError> { - self.send_request(|reply_tx| JsWorkerRequest::DisconnectClient { reply_tx, client_id }) + self.send_request(|reply_tx| JsMainWorkerRequest::DisconnectClient { reply_tx, client_id }) .await .map_err(|_| ReducerCallError::WorkerError(js_worker_disconnected_error("disconnect_client")))? } pub async fn init_database(&self, program: Program) -> anyhow::Result> { - self.send_request(|reply_tx| JsWorkerRequest::InitDatabase { reply_tx, program }) + self.send_request(|reply_tx| JsMainWorkerRequest::InitDatabase { reply_tx, program }) .await .map_err(|_| anyhow::anyhow!(js_worker_disconnected_error("init_database")))? } + pub async fn call_view(&self, cmd: ViewCommand) -> Result { + self.send_request(|reply_tx| JsMainWorkerRequest::CallView { reply_tx, cmd }) + .await + .map_err(|_| ViewCallError::InternalError(js_worker_disconnected_error("call_view"))) + } +} + +impl JsProcedureInstance { + pub fn trapped(&self) -> bool { + self.trapped.load(Ordering::Relaxed) + } + + async fn send_request( + &self, + request: impl FnOnce(JsReplyTx) -> JsProcedureWorkerRequest, + ) -> Result { + send_js_request(&self.tx, &self.trapped, request).await + } + pub async fn call_procedure(&self, params: CallProcedureParams) -> CallProcedureReturn { - self.send_request(|reply_tx| JsWorkerRequest::CallProcedure { reply_tx, params }) + self.send_request(|reply_tx| JsProcedureWorkerRequest::CallProcedure { reply_tx, params }) .await .unwrap_or_else(|_| CallProcedureReturn { result: Err(ProcedureCallError::InternalError(js_worker_disconnected_error( @@ -586,22 +619,35 @@ impl JsInstance { }) } - pub async fn call_view(&self, cmd: ViewCommand) -> Result { - self.send_request(|reply_tx| JsWorkerRequest::CallView { reply_tx, cmd }) - .await - .map_err(|_| ViewCallError::InternalError(js_worker_disconnected_error("call_view"))) - } - pub(in crate::host) async fn call_scheduled_function( &self, params: ScheduledFunctionParams, ) -> CallScheduledFunctionResult { - self.send_request(|reply_tx| JsWorkerRequest::CallScheduledFunction { reply_tx, params }) + self.send_request(|reply_tx| JsProcedureWorkerRequest::CallScheduledFunction { reply_tx, params }) .await .unwrap_or_else(|_| panic!("worker should stay live while calling a scheduled function")) } } +async fn send_js_request( + tx: &mpsc::Sender, + trapped: &AtomicBool, + request: impl FnOnce(JsReplyTx) -> Req, +) -> Result +where + Req: Send + 'static, +{ + let (reply_tx, reply_rx) = oneshot::channel(); + tx.send(request(reply_tx)).await.map_err(|_| { + trapped.store(true, Ordering::Relaxed); + JsWorkerDisconnected + })?; + reply_rx.await.map_err(|_| { + trapped.store(true, Ordering::Relaxed); + JsWorkerDisconnected + }) +} + #[derive(Clone, Copy, Debug)] struct JsWorkerDisconnected; @@ -611,63 +657,67 @@ fn js_worker_disconnected_error(label: &'static str) -> String { type JsReplyTx = oneshot::Sender; -/// Requests sent to the dedicated JS worker thread. +/// Requests sent to the main JS worker thread. /// /// Most variants carry a `reply_tx` because the worker thread owns the isolate, /// executes the request there, and then has to send the typed result back to /// the async caller. -enum JsWorkerRequest { - /// See [`JsInstance::run_on_thread`]. +enum JsMainWorkerRequest { + /// See [`JsMainInstance::run_on_thread`]. /// - /// This variant does not expect a [`JsWorkerReply`]. + /// This variant carries its own reply channel. RunFunction(Box LocalBoxFuture<'static, ()> + Send>), - /// See [`JsInstance::update_database`]. + /// See [`JsMainInstance::update_database`]. UpdateDatabase { reply_tx: JsReplyTx>, program: Program, old_module_info: Arc, policy: MigrationPolicy, }, - /// See [`JsInstance::call_reducer`]. + /// See [`JsMainInstance::call_reducer`]. CallReducer { reply_tx: JsReplyTx, params: CallReducerParams, }, - /// See [`JsInstance::call_view`]. + /// See [`JsMainInstance::call_view`]. CallView { reply_tx: JsReplyTx, cmd: ViewCommand, }, - /// See [`JsInstance::call_procedure`]. - CallProcedure { - reply_tx: JsReplyTx, - params: CallProcedureParams, - }, - /// See [`JsInstance::clear_all_clients`]. + /// See [`JsMainInstance::clear_all_clients`]. ClearAllClients(JsReplyTx>), - /// See [`JsInstance::call_identity_connected`]. + /// See [`JsMainInstance::call_identity_connected`]. CallIdentityConnected { reply_tx: JsReplyTx>, caller_auth: ConnectionAuthCtx, caller_connection_id: ConnectionId, }, - /// See [`JsInstance::call_identity_disconnected`]. + /// See [`JsMainInstance::call_identity_disconnected`]. CallIdentityDisconnected { reply_tx: JsReplyTx>, caller_identity: Identity, caller_connection_id: ConnectionId, }, - /// See [`JsInstance::disconnect_client`]. + /// See [`JsMainInstance::disconnect_client`]. DisconnectClient { reply_tx: JsReplyTx>, client_id: ClientActorId, }, - /// See [`JsInstance::init_database`]. + /// See [`JsMainInstance::init_database`]. InitDatabase { reply_tx: JsReplyTx>>, program: Program, }, - /// See [`JsInstance::call_scheduled_function`]. +} + +/// Requests sent to a procedure JS worker thread. +enum JsProcedureWorkerRequest { + /// See [`JsProcedureInstance::call_procedure`]. + CallProcedure { + reply_tx: JsReplyTx, + params: CallProcedureParams, + }, + /// See [`JsProcedureInstance::call_scheduled_function`]. CallScheduledFunction { reply_tx: JsReplyTx, params: ScheduledFunctionParams, @@ -850,7 +900,7 @@ fn should_retire_worker_for_heap( } } -/// Performs some of the startup work of [`spawn_instance_worker`]. +/// Performs some of the shared startup work for JS worker isolates. /// /// NOTE(centril): in its own function due to lack of `try` blocks. fn startup_instance_worker<'scope>( @@ -929,29 +979,222 @@ where f(&mut guard) } -/// Spawns an instance worker for `program` -/// and returns on success the corresponding [`JsInstance`] -/// that talks to the worker. +async fn spawn_main_instance_worker( + program: Arc, + module_or_mcc: Either, + load_balance_guard: Arc, + core_pinner: CorePinner, + heap_policy: V8HeapPolicyConfig, +) -> anyhow::Result<(ModuleCommon, JsMainInstance)> { + spawn_instance_worker::(program, module_or_mcc, load_balance_guard, core_pinner, heap_policy).await +} + +async fn spawn_procedure_instance_worker( + program: Arc, + module_or_mcc: Either, + load_balance_guard: Arc, + core_pinner: CorePinner, + heap_policy: V8HeapPolicyConfig, +) -> anyhow::Result<(ModuleCommon, JsProcedureInstance)> { + spawn_instance_worker::(program, module_or_mcc, load_balance_guard, core_pinner, heap_policy) + .await +} + +struct MainJsWorker; + +struct ProcedureJsWorker; + +trait JsWorkerSpec { + type Request: Send + 'static; + type Instance; + + const KIND: JsWorkerKind; + + fn make_instance(tx: mpsc::Sender, trapped: Arc) -> Self::Instance; + + fn handle_request( + request: Self::Request, + rt: &tokio::runtime::Handle, + instance_common: &mut InstanceCommon, + inst: &mut V8Instance<'_, '_, '_>, + module_common: &ModuleCommon, + replica_ctx: &Arc, + ) -> bool; +} + +impl JsWorkerSpec for MainJsWorker { + type Request = JsMainWorkerRequest; + type Instance = JsMainInstance; + + const KIND: JsWorkerKind = JsWorkerKind::Main; + + fn make_instance(tx: mpsc::Sender, trapped: Arc) -> Self::Instance { + JsMainInstance { tx, trapped } + } + + fn handle_request( + request: Self::Request, + rt: &tokio::runtime::Handle, + instance_common: &mut InstanceCommon, + inst: &mut V8Instance<'_, '_, '_>, + module_common: &ModuleCommon, + replica_ctx: &Arc, + ) -> bool { + handle_main_worker_request(request, rt, instance_common, inst, module_common, replica_ctx) + } +} + +impl JsWorkerSpec for ProcedureJsWorker { + type Request = JsProcedureWorkerRequest; + type Instance = JsProcedureInstance; + + const KIND: JsWorkerKind = JsWorkerKind::Procedure; + + fn make_instance(tx: mpsc::Sender, trapped: Arc) -> Self::Instance { + JsProcedureInstance { tx, trapped } + } + + fn handle_request( + request: Self::Request, + _rt: &tokio::runtime::Handle, + instance_common: &mut InstanceCommon, + inst: &mut V8Instance<'_, '_, '_>, + _module_common: &ModuleCommon, + _replica_ctx: &Arc, + ) -> bool { + handle_procedure_worker_request(request, instance_common, inst) + } +} + +fn handle_main_worker_request( + request: JsMainWorkerRequest, + rt: &tokio::runtime::Handle, + instance_common: &mut InstanceCommon, + inst: &mut V8Instance<'_, '_, '_>, + module_common: &ModuleCommon, + replica_ctx: &Arc, +) -> bool { + let info = module_common.info(); + let mut call_reducer = |tx, params| instance_common.call_reducer_with_tx(tx, params, inst); + let mut should_exit = false; + + match request { + JsMainWorkerRequest::RunFunction(f) => rt.block_on(f()), + JsMainWorkerRequest::UpdateDatabase { + reply_tx, + program, + old_module_info, + policy, + } => { + let res = instance_common.update_database(program, old_module_info, policy, inst); + send_worker_reply("update_database", reply_tx, res); + } + JsMainWorkerRequest::CallReducer { reply_tx, params } => { + let (res, trapped) = call_reducer(None, params); + send_worker_reply("call_reducer", reply_tx, res); + should_exit = trapped; + } + JsMainWorkerRequest::CallView { reply_tx, cmd } => { + let (res, trapped) = instance_common.handle_cmd(cmd, inst); + send_worker_reply("call_view", reply_tx, res); + should_exit = trapped; + } + JsMainWorkerRequest::ClearAllClients(reply_tx) => { + let res = instance_common.clear_all_clients(); + send_worker_reply("clear_all_clients", reply_tx, res); + } + JsMainWorkerRequest::CallIdentityConnected { + reply_tx, + caller_auth, + caller_connection_id, + } => { + let mut trapped = false; + let res = call_identity_connected(caller_auth, caller_connection_id, &info, call_reducer, &mut trapped); + send_worker_reply("call_identity_connected", reply_tx, res); + should_exit = trapped; + } + JsMainWorkerRequest::CallIdentityDisconnected { + reply_tx, + caller_identity, + caller_connection_id, + } => { + let mut trapped = false; + let res = ModuleHost::call_identity_disconnected_inner( + caller_identity, + caller_connection_id, + &info, + call_reducer, + &mut trapped, + ); + send_worker_reply("call_identity_disconnected", reply_tx, res); + should_exit = trapped; + } + JsMainWorkerRequest::DisconnectClient { reply_tx, client_id } => { + let mut trapped = false; + let res = ModuleHost::disconnect_client_inner(client_id, &info, call_reducer, &mut trapped); + send_worker_reply("disconnect_client", reply_tx, res); + should_exit = trapped; + } + JsMainWorkerRequest::InitDatabase { reply_tx, program } => { + let (res, trapped): (Result, anyhow::Error>, bool) = + init_database(replica_ctx, &info.module_def, program, call_reducer); + send_worker_reply("init_database", reply_tx, res); + should_exit = trapped; + } + } + + should_exit +} + +fn handle_procedure_worker_request( + request: JsProcedureWorkerRequest, + instance_common: &mut InstanceCommon, + inst: &mut V8Instance<'_, '_, '_>, +) -> bool { + match request { + JsProcedureWorkerRequest::CallProcedure { reply_tx, params } => { + let (res, trapped) = instance_common + .call_procedure(params, inst) + .now_or_never() + .expect("our call_procedure implementation is not actually async"); + send_worker_reply("call_procedure", reply_tx, res); + trapped + } + JsProcedureWorkerRequest::CallScheduledFunction { reply_tx, params } => { + let (res, trapped) = instance_common + .call_scheduled_function(params, inst) + .now_or_never() + .expect("our call_procedure implementation is not actually async"); + send_worker_reply("call_scheduled_function", reply_tx, res); + trapped + } + } +} + +/// Spawns an instance worker for `program` and returns on success the +/// corresponding instance handle that talks to the worker. /// -/// When [`ModuleCommon`] is passed, it's assumed that `spawn_instance_worker` -/// has already happened once for this `program` and that it has been -/// validated. In that case, `Ok(_)` should be returned. +/// When [`ModuleCommon`] is passed, it's assumed that this program has already +/// been validated. In that case, `Ok(_)` should be returned. /// -/// Otherwise, when [`ModuleCreationContext`] is passed, -/// this is the first time both the module and instance are created. +/// Otherwise, when [`ModuleCreationContext`] is passed, this is the first time +/// both the module and instance are created. /// /// `load_balance_guard` and `core_pinner` should both be from the same /// [`AllocatedJobCore`], and are used to manage the core pinning of this thread. -async fn spawn_instance_worker( +async fn spawn_instance_worker( program: Arc, module_or_mcc: Either, load_balance_guard: Arc, mut core_pinner: CorePinner, heap_policy: V8HeapPolicyConfig, - worker_kind: JsWorkerKind, -) -> anyhow::Result<(ModuleCommon, JsInstance)> { +) -> anyhow::Result<(ModuleCommon, W::Instance)> +where + W: JsWorkerSpec + 'static, +{ // This one-shot channel is used for initial startup error handling within the thread. let (result_tx, result_rx) = oneshot::channel(); + let worker_kind = W::KIND; let (request_tx, mut request_rx) = mpsc::channel(worker_kind.request_queue_capacity()); let trapped = Arc::new(AtomicBool::new(false)); let trapped_in_thread = trapped.clone(); @@ -1039,121 +1282,21 @@ async fn spawn_instance_worker( // Process requests to the worker. // - // The loop is terminated when the last `JsInstance` handle is dropped. + // The loop is terminated when the last worker instance handle is dropped. // This will cause channels, scopes, and the isolate to be cleaned up. let mut requests_since_heap_check = 0u64; let mut last_heap_check_at = Instant::now(); while let Some(request) = request_rx.blocking_recv() { - let mut call_reducer = |tx, params| instance_common.call_reducer_with_tx(tx, params, &mut inst); - let mut should_exit = false; - core_pinner.pin_if_changed(); - match request { - JsWorkerRequest::RunFunction(f) => rt.block_on(f()), - JsWorkerRequest::UpdateDatabase { - reply_tx, - program, - old_module_info, - policy, - } => { - let res = instance_common.update_database(program, old_module_info, policy, &mut inst); - send_worker_reply("update_database", reply_tx, res); - } - JsWorkerRequest::CallReducer { reply_tx, params } => { - let (res, trapped) = call_reducer(None, params); - if trapped { - trapped_in_thread.store(true, Ordering::Relaxed); - } - send_worker_reply("call_reducer", reply_tx, res); - should_exit = trapped; - } - JsWorkerRequest::CallView { reply_tx, cmd } => { - let (res, trapped) = instance_common.handle_cmd(cmd, &mut inst); - if trapped { - trapped_in_thread.store(true, Ordering::Relaxed); - } - send_worker_reply("call_view", reply_tx, res); - should_exit = trapped; - } - JsWorkerRequest::CallProcedure { reply_tx, params } => { - let (res, trapped) = instance_common - .call_procedure(params, &mut inst) - .now_or_never() - .expect("our call_procedure implementation is not actually async"); - if trapped { - trapped_in_thread.store(true, Ordering::Relaxed); - } - send_worker_reply("call_procedure", reply_tx, res); - should_exit = trapped; - } - JsWorkerRequest::ClearAllClients(reply_tx) => { - let res = instance_common.clear_all_clients(); - send_worker_reply("clear_all_clients", reply_tx, res); - } - JsWorkerRequest::CallIdentityConnected { - reply_tx, - caller_auth, - caller_connection_id, - } => { - let mut trapped = false; - let res = - call_identity_connected(caller_auth, caller_connection_id, info, call_reducer, &mut trapped); - if trapped { - trapped_in_thread.store(true, Ordering::Relaxed); - } - send_worker_reply("call_identity_connected", reply_tx, res); - should_exit = trapped; - } - JsWorkerRequest::CallIdentityDisconnected { - reply_tx, - caller_identity, - caller_connection_id, - } => { - let mut trapped = false; - let res = ModuleHost::call_identity_disconnected_inner( - caller_identity, - caller_connection_id, - info, - call_reducer, - &mut trapped, - ); - if trapped { - trapped_in_thread.store(true, Ordering::Relaxed); - } - send_worker_reply("call_identity_disconnected", reply_tx, res); - should_exit = trapped; - } - JsWorkerRequest::DisconnectClient { reply_tx, client_id } => { - let mut trapped = false; - let res = ModuleHost::disconnect_client_inner(client_id, info, call_reducer, &mut trapped); - if trapped { - trapped_in_thread.store(true, Ordering::Relaxed); - } - send_worker_reply("disconnect_client", reply_tx, res); - should_exit = trapped; - } - JsWorkerRequest::InitDatabase { reply_tx, program } => { - let (res, trapped): (Result, anyhow::Error>, bool) = - init_database(replica_ctx, &module_common.info().module_def, program, call_reducer); - if trapped { - trapped_in_thread.store(true, Ordering::Relaxed); - } - send_worker_reply("init_database", reply_tx, res); - should_exit = trapped; - } - JsWorkerRequest::CallScheduledFunction { reply_tx, params } => { - let (res, trapped) = instance_common - .call_scheduled_function(params, &mut inst) - .now_or_never() - .expect("our call_procedure implementation is not actually async"); - if trapped { - trapped_in_thread.store(true, Ordering::Relaxed); - } - send_worker_reply("call_scheduled_function", reply_tx, res); - should_exit = trapped; - } - } + let mut should_exit = W::handle_request( + request, + &rt, + &mut instance_common, + &mut inst, + &module_common, + replica_ctx, + ); if !should_exit && let Some(heap_metrics) = heap_metrics.as_mut() { let request_check_due = heap_policy.heap_check_request_interval.is_some_and(|interval| { @@ -1167,7 +1310,6 @@ async fn spawn_instance_worker( requests_since_heap_check = 0; last_heap_check_at = Instant::now(); if let Some((used, limit)) = should_retire_worker_for_heap(inst.scope, heap_metrics, heap_policy) { - trapped_in_thread.store(true, Ordering::Relaxed); should_exit = true; log::warn!( "retiring JS worker after V8 heap stayed high post-GC: used={}MiB limit={}MiB", @@ -1189,6 +1331,7 @@ async fn spawn_instance_worker( // this gives it a bounded-memory replacement policy instead of // letting one isolate absorb the entire module lifetime. if should_exit { + trapped_in_thread.store(true, Ordering::Relaxed); break; } } @@ -1197,10 +1340,7 @@ async fn spawn_instance_worker( // Get the module, if any, and get any setup errors from the worker. let res: Result = result_rx.await.expect("should have a sender"); res.map(|opt_mc| { - let inst = JsInstance { - tx: request_tx, - trapped, - }; + let inst = W::make_instance(request_tx, trapped); (opt_mc, inst) }) }