Define two js instance/worker types

One for procedures and one for everything else.
This commit is contained in:
joshua-spacetime
2026-05-04 13:06:12 -07:00
parent 8585478d36
commit 181663fc39
2 changed files with 348 additions and 212 deletions
+20 -24
View File
@@ -12,7 +12,7 @@ use crate::estimation::{check_row_limit, estimate_rows_scanned};
use crate::hash::Hash;
use crate::host::host_controller::CallProcedureReturn;
use crate::host::scheduler::{CallScheduledFunctionError, CallScheduledFunctionResult, ScheduledFunctionParams};
use crate::host::v8::JsInstance;
use crate::host::v8::{JsMainInstance, JsProcedureInstance};
pub use crate::host::wasm_common::module_host_actor::{InstanceCommon, WasmInstance};
use crate::host::wasmtime::ModuleInstance;
use crate::host::{InvalidFunctionArguments, InvalidViewArguments};
@@ -345,7 +345,7 @@ pub enum ModuleWithInstance {
},
Js {
module: super::v8::JsModule,
init_inst: super::v8::JsInstance,
init_inst: super::v8::JsMainInstance,
},
}
@@ -361,7 +361,7 @@ struct WasmtimeModuleHost {
struct V8ModuleHost {
module: super::v8::JsModule,
main_instance: SharedJsInstanceManager,
main_instance: SharedJsMainInstanceManager,
procedure_instances: ModuleInstanceManager<super::v8::JsModule>,
}
@@ -407,7 +407,7 @@ impl GenericModule for super::wasmtime::Module {
}
impl GenericModule for super::v8::JsModule {
type Instance = super::v8::JsInstance;
type Instance = super::v8::JsProcedureInstance;
async fn create_instance(&self) -> Self::Instance {
self.create_instance().await
}
@@ -416,14 +416,10 @@ impl GenericModule for super::v8::JsModule {
}
}
impl GenericModuleInstance for super::v8::JsInstance {
impl GenericModuleInstance for super::v8::JsProcedureInstance {
fn trapped(&self) -> bool {
self.trapped()
}
fn needs_replacement(&self) -> bool {
self.needs_replacement()
}
}
/// Creates the table for `table_def` in `stdb`.
@@ -763,9 +759,9 @@ impl CallProcedureParams {
///
/// Capable of managing and allocating multiple instances of the same module,
/// but this functionality is currently unused, as only one reducer runs at a time.
/// When we introduce procedures, it will be necessary to have multiple instances,
/// as each procedure invocation will have its own sandboxed instance,
/// and multiple procedures can run concurrently with up to one reducer.
/// Procedures need multiple instances, as each procedure invocation has its own
/// sandboxed instance and multiple procedures can run concurrently with up to
/// one reducer.
struct ModuleInstanceManager<M: GenericModule> {
instances: Mutex<VecDeque<M::Instance>>,
module: M,
@@ -778,8 +774,8 @@ struct ModuleInstanceManager<M: GenericModule> {
/// clone of the active instance handle immediately and enqueue work on its
/// sender. Replacement is serialized only after that active instance traps or
/// otherwise becomes unusable.
struct SharedJsInstanceManager {
active: RwLock<JsInstance>,
struct SharedJsMainInstanceManager {
active: RwLock<JsMainInstance>,
module: super::v8::JsModule,
metrics: InstanceManagerMetrics,
replace_lock: AsyncMutex<()>,
@@ -960,8 +956,8 @@ impl<M: GenericModule> ModuleInstanceManager<M> {
}
}
impl SharedJsInstanceManager {
fn new(module: super::v8::JsModule, init_inst: JsInstance, metrics: InstanceManagerMetrics) -> Self {
impl SharedJsMainInstanceManager {
fn new(module: super::v8::JsModule, init_inst: JsMainInstance, metrics: InstanceManagerMetrics) -> Self {
metrics.track_initial_instance();
Self {
active: RwLock::new(init_inst),
@@ -971,7 +967,7 @@ impl SharedJsInstanceManager {
}
}
async fn with_instance<R>(&self, f: impl AsyncFnOnce(JsInstance) -> R) -> R {
async fn with_instance<R>(&self, f: impl AsyncFnOnce(JsMainInstance) -> R) -> R {
let inst = self.get_instance().await;
let observed = inst.clone();
let res = f(inst).await;
@@ -979,7 +975,7 @@ impl SharedJsInstanceManager {
res
}
async fn get_instance(&self) -> JsInstance {
async fn get_instance(&self) -> JsMainInstance {
let inst = self.active.read().clone();
if !inst.needs_replacement() {
return inst;
@@ -989,7 +985,7 @@ impl SharedJsInstanceManager {
self.active.read().clone()
}
async fn replace_if_needed(&self, observed: &JsInstance) {
async fn replace_if_needed(&self, observed: &JsMainInstance) {
if !observed.needs_replacement() {
return;
}
@@ -1222,7 +1218,7 @@ impl ModuleHost {
info = module.info();
let metrics = InstanceManagerMetrics::new(module.host_type(), database_identity);
let host_module = module.clone();
let main_instance = SharedJsInstanceManager::new(module.clone(), init_inst, metrics.clone());
let main_instance = SharedJsMainInstanceManager::new(module.clone(), init_inst, metrics.clone());
let procedure_instances = ModuleInstanceManager::new_with_metrics(module, None, metrics);
Arc::new(ModuleHostInner::Js(Box::new(V8ModuleHost {
module: host_module,
@@ -1345,7 +1341,7 @@ impl ModuleHost {
arg: A,
timer: impl FnOnce(&str) -> Guard,
work_wasm: impl AsyncFnOnce(Guard, &SingleCoreExecutor, Box<ModuleInstance>, A) -> (R, Box<ModuleInstance>),
work_js: impl AsyncFnOnce(Guard, &JsInstance, A) -> R,
work_js: impl AsyncFnOnce(Guard, &JsMainInstance, A) -> R,
) -> Result<R, NoSuchModule> {
self.guard_closed()?;
let timer_guard = timer(label);
@@ -1388,7 +1384,7 @@ impl ModuleHost {
label: &str,
arg: A,
wasm: impl AsyncFnOnce(A, &mut ModuleInstance) -> R + Send + 'static,
js: impl AsyncFnOnce(A, &JsInstance) -> R,
js: impl AsyncFnOnce(A, &JsMainInstance) -> R,
) -> Result<R, NoSuchModule>
where
R: Send + 'static,
@@ -1431,7 +1427,7 @@ impl ModuleHost {
label: &str,
arg: A,
wasm: impl AsyncFnOnce(A, &mut ModuleInstance) -> R + Send + 'static,
js: impl AsyncFnOnce(A, &JsInstance) -> R,
js: impl AsyncFnOnce(A, &JsProcedureInstance) -> R,
) -> Result<R, NoSuchModule>
where
R: Send + 'static,
@@ -1471,7 +1467,7 @@ impl ModuleHost {
}
async fn call_view_command(&self, label: &str, cmd: ViewCommand) -> Result<ViewCommandResult, ViewCallError> {
self.call_pooled(
self.call(
label,
cmd,
async |cmd, inst| Ok::<_, ViewCallError>(inst.call_view(cmd)),
+328 -188
View File
@@ -114,7 +114,7 @@ impl V8Runtime {
static V8_RUNTIME_GLOBAL: LazyLock<V8RuntimeInner> = LazyLock::new(V8RuntimeInner::init);
const REDUCER_ARGS_BUFFER_SIZE: usize = 4_096;
const JS_MAIN_INSTANCE_QUEUE_CAPACITY: usize = 1024;
const JS_POOLED_INSTANCE_QUEUE_CAPACITY: usize = 1;
const JS_PROCEDURE_INSTANCE_QUEUE_CAPACITY: usize = 1;
pub(crate) const V8_WORKER_KIND_MAIN: &str = "main";
thread_local! {
@@ -156,7 +156,7 @@ impl Drop for EnteredJsModuleThread {
#[derive(Copy, Clone)]
enum JsWorkerKind {
Main,
Pooled,
Procedure,
}
impl JsWorkerKind {
@@ -167,7 +167,7 @@ impl JsWorkerKind {
const fn request_queue_capacity(self) -> usize {
match self {
Self::Main => JS_MAIN_INSTANCE_QUEUE_CAPACITY,
Self::Pooled => JS_POOLED_INSTANCE_QUEUE_CAPACITY,
Self::Procedure => JS_PROCEDURE_INSTANCE_QUEUE_CAPACITY,
}
}
}
@@ -238,13 +238,12 @@ impl V8RuntimeInner {
let mcc = Either::Right(mcc);
let load_balance_guard = Arc::new(core.guard);
let core_pinner = core.pinner;
let (common, init_inst) = spawn_instance_worker(
let (common, init_inst) = spawn_main_instance_worker(
program.clone(),
mcc,
load_balance_guard.clone(),
core_pinner.clone(),
heap_policy,
JsWorkerKind::Main,
)
.await?;
let module = JsModule {
@@ -281,7 +280,7 @@ impl JsModule {
self.common.info().clone()
}
async fn create_instance_with_kind(&self, worker_kind: JsWorkerKind) -> JsInstance {
async fn create_procedure_instance(&self) -> JsProcedureInstance {
let program = self.program.clone();
let common = self.common.clone();
let load_balance_guard = self.load_balance_guard.clone();
@@ -289,25 +288,39 @@ impl JsModule {
let heap_policy = self.heap_policy;
// This has to be done in a blocking context because of `blocking_recv`.
let (_, instance) = spawn_instance_worker(
let (_, instance) = spawn_procedure_instance_worker(
program,
Either::Left(common),
load_balance_guard,
core_pinner,
heap_policy,
worker_kind,
)
.await
.expect("`spawn_instance_worker` should succeed when passed `ModuleCommon`");
.expect("`spawn_procedure_instance_worker` should succeed when passed `ModuleCommon`");
instance
}
pub async fn create_instance(&self) -> JsInstance {
self.create_instance_with_kind(JsWorkerKind::Pooled).await
pub async fn create_instance(&self) -> JsProcedureInstance {
self.create_procedure_instance().await
}
pub(in crate::host) async fn create_main_instance(&self) -> JsInstance {
self.create_instance_with_kind(JsWorkerKind::Main).await
pub(in crate::host) async fn create_main_instance(&self) -> JsMainInstance {
let program = self.program.clone();
let common = self.common.clone();
let load_balance_guard = self.load_balance_guard.clone();
let core_pinner = self.core_pinner.clone();
let heap_policy = self.heap_policy;
let (_, instance) = spawn_main_instance_worker(
program,
Either::Left(common),
load_balance_guard,
core_pinner,
heap_policy,
)
.await
.expect("`spawn_main_instance_worker` should succeed when passed `ModuleCommon`");
instance
}
}
@@ -321,7 +334,7 @@ fn env_on_isolate_unwrap(isolate: &mut Isolate) -> &mut JsInstanceEnv {
env_on_isolate(isolate).expect("there should be a `JsInstanceEnv`")
}
/// The environment of a [`JsInstance`].
/// The environment bound to a JS worker isolate.
struct JsInstanceEnv {
instance_env: InstanceEnv,
module_def: Option<Arc<ModuleDef>>,
@@ -425,7 +438,7 @@ impl JsInstanceEnv {
}
}
/// An instance for a [`JsModule`].
/// The main instance for a [`JsModule`].
///
/// The actual work happens in a worker thread,
/// which the instance communicates with through channels.
@@ -439,12 +452,21 @@ impl JsInstanceEnv {
/// which will cause the worker's loop to terminate and cleanup the isolate
/// and friends.
#[derive(Clone)]
pub struct JsInstance {
tx: mpsc::Sender<JsWorkerRequest>,
pub struct JsMainInstance {
tx: mpsc::Sender<JsMainWorkerRequest>,
trapped: Arc<AtomicBool>,
}
impl JsInstance {
/// A procedure instance for a [`JsModule`].
///
/// Procedure instances are checked out exclusively from the procedure pool and
/// only execute procedure-style requests.
pub struct JsProcedureInstance {
tx: mpsc::Sender<JsProcedureWorkerRequest>,
trapped: Arc<AtomicBool>,
}
impl JsMainInstance {
pub fn trapped(&self) -> bool {
self.trapped.load(Ordering::Relaxed)
}
@@ -455,17 +477,9 @@ impl JsInstance {
async fn send_request<T>(
&self,
request: impl FnOnce(JsReplyTx<T>) -> JsWorkerRequest,
request: impl FnOnce(JsReplyTx<T>) -> JsMainWorkerRequest,
) -> Result<T, JsWorkerDisconnected> {
let (reply_tx, reply_rx) = oneshot::channel();
self.tx.send(request(reply_tx)).await.map_err(|_| {
self.trapped.store(true, Ordering::Relaxed);
JsWorkerDisconnected
})?;
reply_rx.await.map_err(|_| {
self.trapped.store(true, Ordering::Relaxed);
JsWorkerDisconnected
})
send_js_request(&self.tx, &self.trapped, request).await
}
pub async fn run_on_thread<F, R>(&self, f: F) -> R
@@ -478,7 +492,7 @@ impl JsInstance {
let (tx, rx) = oneshot::channel();
self.tx
.send(JsWorkerRequest::RunFunction(Box::new(move || {
.send(JsMainWorkerRequest::RunFunction(Box::new(move || {
async move {
let _on_js_module_thread = EnteredJsModuleThread::new();
let result = AssertUnwindSafe(f().instrument(span)).catch_unwind().await;
@@ -509,7 +523,7 @@ impl JsInstance {
old_module_info: Arc<ModuleInfo>,
policy: MigrationPolicy,
) -> anyhow::Result<UpdateDatabaseResult> {
self.send_request(|reply_tx| JsWorkerRequest::UpdateDatabase {
self.send_request(|reply_tx| JsMainWorkerRequest::UpdateDatabase {
reply_tx,
program,
old_module_info,
@@ -520,13 +534,13 @@ impl JsInstance {
}
pub async fn call_reducer(&self, params: CallReducerParams) -> Result<ReducerCallResult, ReducerCallError> {
self.send_request(|reply_tx| JsWorkerRequest::CallReducer { reply_tx, params })
self.send_request(|reply_tx| JsMainWorkerRequest::CallReducer { reply_tx, params })
.await
.map_err(|_| ReducerCallError::WorkerError(js_worker_disconnected_error("call_reducer")))
}
pub async fn clear_all_clients(&self) -> anyhow::Result<()> {
self.send_request(JsWorkerRequest::ClearAllClients)
self.send_request(JsMainWorkerRequest::ClearAllClients)
.await
.map_err(|_| anyhow::anyhow!(js_worker_disconnected_error("clear_all_clients")))?
}
@@ -536,7 +550,7 @@ impl JsInstance {
caller_auth: ConnectionAuthCtx,
caller_connection_id: ConnectionId,
) -> Result<(), ClientConnectedError> {
self.send_request(|reply_tx| JsWorkerRequest::CallIdentityConnected {
self.send_request(|reply_tx| JsMainWorkerRequest::CallIdentityConnected {
reply_tx,
caller_auth,
caller_connection_id,
@@ -554,7 +568,7 @@ impl JsInstance {
caller_identity: Identity,
caller_connection_id: ConnectionId,
) -> Result<(), ReducerCallError> {
self.send_request(|reply_tx| JsWorkerRequest::CallIdentityDisconnected {
self.send_request(|reply_tx| JsMainWorkerRequest::CallIdentityDisconnected {
reply_tx,
caller_identity,
caller_connection_id,
@@ -564,19 +578,38 @@ impl JsInstance {
}
pub async fn disconnect_client(&self, client_id: ClientActorId) -> Result<(), ReducerCallError> {
self.send_request(|reply_tx| JsWorkerRequest::DisconnectClient { reply_tx, client_id })
self.send_request(|reply_tx| JsMainWorkerRequest::DisconnectClient { reply_tx, client_id })
.await
.map_err(|_| ReducerCallError::WorkerError(js_worker_disconnected_error("disconnect_client")))?
}
pub async fn init_database(&self, program: Program) -> anyhow::Result<Option<ReducerCallResult>> {
self.send_request(|reply_tx| JsWorkerRequest::InitDatabase { reply_tx, program })
self.send_request(|reply_tx| JsMainWorkerRequest::InitDatabase { reply_tx, program })
.await
.map_err(|_| anyhow::anyhow!(js_worker_disconnected_error("init_database")))?
}
pub async fn call_view(&self, cmd: ViewCommand) -> Result<ViewCommandResult, ViewCallError> {
self.send_request(|reply_tx| JsMainWorkerRequest::CallView { reply_tx, cmd })
.await
.map_err(|_| ViewCallError::InternalError(js_worker_disconnected_error("call_view")))
}
}
impl JsProcedureInstance {
pub fn trapped(&self) -> bool {
self.trapped.load(Ordering::Relaxed)
}
async fn send_request<T>(
&self,
request: impl FnOnce(JsReplyTx<T>) -> JsProcedureWorkerRequest,
) -> Result<T, JsWorkerDisconnected> {
send_js_request(&self.tx, &self.trapped, request).await
}
pub async fn call_procedure(&self, params: CallProcedureParams) -> CallProcedureReturn {
self.send_request(|reply_tx| JsWorkerRequest::CallProcedure { reply_tx, params })
self.send_request(|reply_tx| JsProcedureWorkerRequest::CallProcedure { reply_tx, params })
.await
.unwrap_or_else(|_| CallProcedureReturn {
result: Err(ProcedureCallError::InternalError(js_worker_disconnected_error(
@@ -586,22 +619,35 @@ impl JsInstance {
})
}
pub async fn call_view(&self, cmd: ViewCommand) -> Result<ViewCommandResult, ViewCallError> {
self.send_request(|reply_tx| JsWorkerRequest::CallView { reply_tx, cmd })
.await
.map_err(|_| ViewCallError::InternalError(js_worker_disconnected_error("call_view")))
}
pub(in crate::host) async fn call_scheduled_function(
&self,
params: ScheduledFunctionParams,
) -> CallScheduledFunctionResult {
self.send_request(|reply_tx| JsWorkerRequest::CallScheduledFunction { reply_tx, params })
self.send_request(|reply_tx| JsProcedureWorkerRequest::CallScheduledFunction { reply_tx, params })
.await
.unwrap_or_else(|_| panic!("worker should stay live while calling a scheduled function"))
}
}
async fn send_js_request<Req, T>(
tx: &mpsc::Sender<Req>,
trapped: &AtomicBool,
request: impl FnOnce(JsReplyTx<T>) -> Req,
) -> Result<T, JsWorkerDisconnected>
where
Req: Send + 'static,
{
let (reply_tx, reply_rx) = oneshot::channel();
tx.send(request(reply_tx)).await.map_err(|_| {
trapped.store(true, Ordering::Relaxed);
JsWorkerDisconnected
})?;
reply_rx.await.map_err(|_| {
trapped.store(true, Ordering::Relaxed);
JsWorkerDisconnected
})
}
#[derive(Clone, Copy, Debug)]
struct JsWorkerDisconnected;
@@ -611,63 +657,67 @@ fn js_worker_disconnected_error(label: &'static str) -> String {
type JsReplyTx<T> = oneshot::Sender<T>;
/// Requests sent to the dedicated JS worker thread.
/// Requests sent to the main JS worker thread.
///
/// Most variants carry a `reply_tx` because the worker thread owns the isolate,
/// executes the request there, and then has to send the typed result back to
/// the async caller.
enum JsWorkerRequest {
/// See [`JsInstance::run_on_thread`].
enum JsMainWorkerRequest {
/// See [`JsMainInstance::run_on_thread`].
///
/// This variant does not expect a [`JsWorkerReply`].
/// This variant carries its own reply channel.
RunFunction(Box<dyn FnOnce() -> LocalBoxFuture<'static, ()> + Send>),
/// See [`JsInstance::update_database`].
/// See [`JsMainInstance::update_database`].
UpdateDatabase {
reply_tx: JsReplyTx<anyhow::Result<UpdateDatabaseResult>>,
program: Program,
old_module_info: Arc<ModuleInfo>,
policy: MigrationPolicy,
},
/// See [`JsInstance::call_reducer`].
/// See [`JsMainInstance::call_reducer`].
CallReducer {
reply_tx: JsReplyTx<ReducerCallResult>,
params: CallReducerParams,
},
/// See [`JsInstance::call_view`].
/// See [`JsMainInstance::call_view`].
CallView {
reply_tx: JsReplyTx<ViewCommandResult>,
cmd: ViewCommand,
},
/// See [`JsInstance::call_procedure`].
CallProcedure {
reply_tx: JsReplyTx<CallProcedureReturn>,
params: CallProcedureParams,
},
/// See [`JsInstance::clear_all_clients`].
/// See [`JsMainInstance::clear_all_clients`].
ClearAllClients(JsReplyTx<anyhow::Result<()>>),
/// See [`JsInstance::call_identity_connected`].
/// See [`JsMainInstance::call_identity_connected`].
CallIdentityConnected {
reply_tx: JsReplyTx<Result<(), ClientConnectedError>>,
caller_auth: ConnectionAuthCtx,
caller_connection_id: ConnectionId,
},
/// See [`JsInstance::call_identity_disconnected`].
/// See [`JsMainInstance::call_identity_disconnected`].
CallIdentityDisconnected {
reply_tx: JsReplyTx<Result<(), ReducerCallError>>,
caller_identity: Identity,
caller_connection_id: ConnectionId,
},
/// See [`JsInstance::disconnect_client`].
/// See [`JsMainInstance::disconnect_client`].
DisconnectClient {
reply_tx: JsReplyTx<Result<(), ReducerCallError>>,
client_id: ClientActorId,
},
/// See [`JsInstance::init_database`].
/// See [`JsMainInstance::init_database`].
InitDatabase {
reply_tx: JsReplyTx<anyhow::Result<Option<ReducerCallResult>>>,
program: Program,
},
/// See [`JsInstance::call_scheduled_function`].
}
/// Requests sent to a procedure JS worker thread.
enum JsProcedureWorkerRequest {
/// See [`JsProcedureInstance::call_procedure`].
CallProcedure {
reply_tx: JsReplyTx<CallProcedureReturn>,
params: CallProcedureParams,
},
/// See [`JsProcedureInstance::call_scheduled_function`].
CallScheduledFunction {
reply_tx: JsReplyTx<CallScheduledFunctionResult>,
params: ScheduledFunctionParams,
@@ -850,7 +900,7 @@ fn should_retire_worker_for_heap(
}
}
/// Performs some of the startup work of [`spawn_instance_worker`].
/// Performs some of the shared startup work for JS worker isolates.
///
/// NOTE(centril): in its own function due to lack of `try` blocks.
fn startup_instance_worker<'scope>(
@@ -929,29 +979,222 @@ where
f(&mut guard)
}
/// Spawns an instance worker for `program`
/// and returns on success the corresponding [`JsInstance`]
/// that talks to the worker.
async fn spawn_main_instance_worker(
program: Arc<str>,
module_or_mcc: Either<ModuleCommon, ModuleCreationContext>,
load_balance_guard: Arc<LoadBalanceOnDropGuard>,
core_pinner: CorePinner,
heap_policy: V8HeapPolicyConfig,
) -> anyhow::Result<(ModuleCommon, JsMainInstance)> {
spawn_instance_worker::<MainJsWorker>(program, module_or_mcc, load_balance_guard, core_pinner, heap_policy).await
}
async fn spawn_procedure_instance_worker(
program: Arc<str>,
module_or_mcc: Either<ModuleCommon, ModuleCreationContext>,
load_balance_guard: Arc<LoadBalanceOnDropGuard>,
core_pinner: CorePinner,
heap_policy: V8HeapPolicyConfig,
) -> anyhow::Result<(ModuleCommon, JsProcedureInstance)> {
spawn_instance_worker::<ProcedureJsWorker>(program, module_or_mcc, load_balance_guard, core_pinner, heap_policy)
.await
}
struct MainJsWorker;
struct ProcedureJsWorker;
trait JsWorkerSpec {
type Request: Send + 'static;
type Instance;
const KIND: JsWorkerKind;
fn make_instance(tx: mpsc::Sender<Self::Request>, trapped: Arc<AtomicBool>) -> Self::Instance;
fn handle_request(
request: Self::Request,
rt: &tokio::runtime::Handle,
instance_common: &mut InstanceCommon,
inst: &mut V8Instance<'_, '_, '_>,
module_common: &ModuleCommon,
replica_ctx: &Arc<ReplicaContext>,
) -> bool;
}
impl JsWorkerSpec for MainJsWorker {
type Request = JsMainWorkerRequest;
type Instance = JsMainInstance;
const KIND: JsWorkerKind = JsWorkerKind::Main;
fn make_instance(tx: mpsc::Sender<Self::Request>, trapped: Arc<AtomicBool>) -> Self::Instance {
JsMainInstance { tx, trapped }
}
fn handle_request(
request: Self::Request,
rt: &tokio::runtime::Handle,
instance_common: &mut InstanceCommon,
inst: &mut V8Instance<'_, '_, '_>,
module_common: &ModuleCommon,
replica_ctx: &Arc<ReplicaContext>,
) -> bool {
handle_main_worker_request(request, rt, instance_common, inst, module_common, replica_ctx)
}
}
impl JsWorkerSpec for ProcedureJsWorker {
type Request = JsProcedureWorkerRequest;
type Instance = JsProcedureInstance;
const KIND: JsWorkerKind = JsWorkerKind::Procedure;
fn make_instance(tx: mpsc::Sender<Self::Request>, trapped: Arc<AtomicBool>) -> Self::Instance {
JsProcedureInstance { tx, trapped }
}
fn handle_request(
request: Self::Request,
_rt: &tokio::runtime::Handle,
instance_common: &mut InstanceCommon,
inst: &mut V8Instance<'_, '_, '_>,
_module_common: &ModuleCommon,
_replica_ctx: &Arc<ReplicaContext>,
) -> bool {
handle_procedure_worker_request(request, instance_common, inst)
}
}
fn handle_main_worker_request(
request: JsMainWorkerRequest,
rt: &tokio::runtime::Handle,
instance_common: &mut InstanceCommon,
inst: &mut V8Instance<'_, '_, '_>,
module_common: &ModuleCommon,
replica_ctx: &Arc<ReplicaContext>,
) -> bool {
let info = module_common.info();
let mut call_reducer = |tx, params| instance_common.call_reducer_with_tx(tx, params, inst);
let mut should_exit = false;
match request {
JsMainWorkerRequest::RunFunction(f) => rt.block_on(f()),
JsMainWorkerRequest::UpdateDatabase {
reply_tx,
program,
old_module_info,
policy,
} => {
let res = instance_common.update_database(program, old_module_info, policy, inst);
send_worker_reply("update_database", reply_tx, res);
}
JsMainWorkerRequest::CallReducer { reply_tx, params } => {
let (res, trapped) = call_reducer(None, params);
send_worker_reply("call_reducer", reply_tx, res);
should_exit = trapped;
}
JsMainWorkerRequest::CallView { reply_tx, cmd } => {
let (res, trapped) = instance_common.handle_cmd(cmd, inst);
send_worker_reply("call_view", reply_tx, res);
should_exit = trapped;
}
JsMainWorkerRequest::ClearAllClients(reply_tx) => {
let res = instance_common.clear_all_clients();
send_worker_reply("clear_all_clients", reply_tx, res);
}
JsMainWorkerRequest::CallIdentityConnected {
reply_tx,
caller_auth,
caller_connection_id,
} => {
let mut trapped = false;
let res = call_identity_connected(caller_auth, caller_connection_id, &info, call_reducer, &mut trapped);
send_worker_reply("call_identity_connected", reply_tx, res);
should_exit = trapped;
}
JsMainWorkerRequest::CallIdentityDisconnected {
reply_tx,
caller_identity,
caller_connection_id,
} => {
let mut trapped = false;
let res = ModuleHost::call_identity_disconnected_inner(
caller_identity,
caller_connection_id,
&info,
call_reducer,
&mut trapped,
);
send_worker_reply("call_identity_disconnected", reply_tx, res);
should_exit = trapped;
}
JsMainWorkerRequest::DisconnectClient { reply_tx, client_id } => {
let mut trapped = false;
let res = ModuleHost::disconnect_client_inner(client_id, &info, call_reducer, &mut trapped);
send_worker_reply("disconnect_client", reply_tx, res);
should_exit = trapped;
}
JsMainWorkerRequest::InitDatabase { reply_tx, program } => {
let (res, trapped): (Result<Option<ReducerCallResult>, anyhow::Error>, bool) =
init_database(replica_ctx, &info.module_def, program, call_reducer);
send_worker_reply("init_database", reply_tx, res);
should_exit = trapped;
}
}
should_exit
}
fn handle_procedure_worker_request(
request: JsProcedureWorkerRequest,
instance_common: &mut InstanceCommon,
inst: &mut V8Instance<'_, '_, '_>,
) -> bool {
match request {
JsProcedureWorkerRequest::CallProcedure { reply_tx, params } => {
let (res, trapped) = instance_common
.call_procedure(params, inst)
.now_or_never()
.expect("our call_procedure implementation is not actually async");
send_worker_reply("call_procedure", reply_tx, res);
trapped
}
JsProcedureWorkerRequest::CallScheduledFunction { reply_tx, params } => {
let (res, trapped) = instance_common
.call_scheduled_function(params, inst)
.now_or_never()
.expect("our call_procedure implementation is not actually async");
send_worker_reply("call_scheduled_function", reply_tx, res);
trapped
}
}
}
/// Spawns an instance worker for `program` and returns on success the
/// corresponding instance handle that talks to the worker.
///
/// When [`ModuleCommon`] is passed, it's assumed that `spawn_instance_worker`
/// has already happened once for this `program` and that it has been
/// validated. In that case, `Ok(_)` should be returned.
/// When [`ModuleCommon`] is passed, it's assumed that this program has already
/// been validated. In that case, `Ok(_)` should be returned.
///
/// Otherwise, when [`ModuleCreationContext`] is passed,
/// this is the first time both the module and instance are created.
/// Otherwise, when [`ModuleCreationContext`] is passed, this is the first time
/// both the module and instance are created.
///
/// `load_balance_guard` and `core_pinner` should both be from the same
/// [`AllocatedJobCore`], and are used to manage the core pinning of this thread.
async fn spawn_instance_worker(
async fn spawn_instance_worker<W>(
program: Arc<str>,
module_or_mcc: Either<ModuleCommon, ModuleCreationContext>,
load_balance_guard: Arc<LoadBalanceOnDropGuard>,
mut core_pinner: CorePinner,
heap_policy: V8HeapPolicyConfig,
worker_kind: JsWorkerKind,
) -> anyhow::Result<(ModuleCommon, JsInstance)> {
) -> anyhow::Result<(ModuleCommon, W::Instance)>
where
W: JsWorkerSpec + 'static,
{
// This one-shot channel is used for initial startup error handling within the thread.
let (result_tx, result_rx) = oneshot::channel();
let worker_kind = W::KIND;
let (request_tx, mut request_rx) = mpsc::channel(worker_kind.request_queue_capacity());
let trapped = Arc::new(AtomicBool::new(false));
let trapped_in_thread = trapped.clone();
@@ -1039,121 +1282,21 @@ async fn spawn_instance_worker(
// Process requests to the worker.
//
// The loop is terminated when the last `JsInstance` handle is dropped.
// The loop is terminated when the last worker instance handle is dropped.
// This will cause channels, scopes, and the isolate to be cleaned up.
let mut requests_since_heap_check = 0u64;
let mut last_heap_check_at = Instant::now();
while let Some(request) = request_rx.blocking_recv() {
let mut call_reducer = |tx, params| instance_common.call_reducer_with_tx(tx, params, &mut inst);
let mut should_exit = false;
core_pinner.pin_if_changed();
match request {
JsWorkerRequest::RunFunction(f) => rt.block_on(f()),
JsWorkerRequest::UpdateDatabase {
reply_tx,
program,
old_module_info,
policy,
} => {
let res = instance_common.update_database(program, old_module_info, policy, &mut inst);
send_worker_reply("update_database", reply_tx, res);
}
JsWorkerRequest::CallReducer { reply_tx, params } => {
let (res, trapped) = call_reducer(None, params);
if trapped {
trapped_in_thread.store(true, Ordering::Relaxed);
}
send_worker_reply("call_reducer", reply_tx, res);
should_exit = trapped;
}
JsWorkerRequest::CallView { reply_tx, cmd } => {
let (res, trapped) = instance_common.handle_cmd(cmd, &mut inst);
if trapped {
trapped_in_thread.store(true, Ordering::Relaxed);
}
send_worker_reply("call_view", reply_tx, res);
should_exit = trapped;
}
JsWorkerRequest::CallProcedure { reply_tx, params } => {
let (res, trapped) = instance_common
.call_procedure(params, &mut inst)
.now_or_never()
.expect("our call_procedure implementation is not actually async");
if trapped {
trapped_in_thread.store(true, Ordering::Relaxed);
}
send_worker_reply("call_procedure", reply_tx, res);
should_exit = trapped;
}
JsWorkerRequest::ClearAllClients(reply_tx) => {
let res = instance_common.clear_all_clients();
send_worker_reply("clear_all_clients", reply_tx, res);
}
JsWorkerRequest::CallIdentityConnected {
reply_tx,
caller_auth,
caller_connection_id,
} => {
let mut trapped = false;
let res =
call_identity_connected(caller_auth, caller_connection_id, info, call_reducer, &mut trapped);
if trapped {
trapped_in_thread.store(true, Ordering::Relaxed);
}
send_worker_reply("call_identity_connected", reply_tx, res);
should_exit = trapped;
}
JsWorkerRequest::CallIdentityDisconnected {
reply_tx,
caller_identity,
caller_connection_id,
} => {
let mut trapped = false;
let res = ModuleHost::call_identity_disconnected_inner(
caller_identity,
caller_connection_id,
info,
call_reducer,
&mut trapped,
);
if trapped {
trapped_in_thread.store(true, Ordering::Relaxed);
}
send_worker_reply("call_identity_disconnected", reply_tx, res);
should_exit = trapped;
}
JsWorkerRequest::DisconnectClient { reply_tx, client_id } => {
let mut trapped = false;
let res = ModuleHost::disconnect_client_inner(client_id, info, call_reducer, &mut trapped);
if trapped {
trapped_in_thread.store(true, Ordering::Relaxed);
}
send_worker_reply("disconnect_client", reply_tx, res);
should_exit = trapped;
}
JsWorkerRequest::InitDatabase { reply_tx, program } => {
let (res, trapped): (Result<Option<ReducerCallResult>, anyhow::Error>, bool) =
init_database(replica_ctx, &module_common.info().module_def, program, call_reducer);
if trapped {
trapped_in_thread.store(true, Ordering::Relaxed);
}
send_worker_reply("init_database", reply_tx, res);
should_exit = trapped;
}
JsWorkerRequest::CallScheduledFunction { reply_tx, params } => {
let (res, trapped) = instance_common
.call_scheduled_function(params, &mut inst)
.now_or_never()
.expect("our call_procedure implementation is not actually async");
if trapped {
trapped_in_thread.store(true, Ordering::Relaxed);
}
send_worker_reply("call_scheduled_function", reply_tx, res);
should_exit = trapped;
}
}
let mut should_exit = W::handle_request(
request,
&rt,
&mut instance_common,
&mut inst,
&module_common,
replica_ctx,
);
if !should_exit && let Some(heap_metrics) = heap_metrics.as_mut() {
let request_check_due = heap_policy.heap_check_request_interval.is_some_and(|interval| {
@@ -1167,7 +1310,6 @@ async fn spawn_instance_worker(
requests_since_heap_check = 0;
last_heap_check_at = Instant::now();
if let Some((used, limit)) = should_retire_worker_for_heap(inst.scope, heap_metrics, heap_policy) {
trapped_in_thread.store(true, Ordering::Relaxed);
should_exit = true;
log::warn!(
"retiring JS worker after V8 heap stayed high post-GC: used={}MiB limit={}MiB",
@@ -1189,6 +1331,7 @@ async fn spawn_instance_worker(
// this gives it a bounded-memory replacement policy instead of
// letting one isolate absorb the entire module lifetime.
if should_exit {
trapped_in_thread.store(true, Ordering::Relaxed);
break;
}
}
@@ -1197,10 +1340,7 @@ async fn spawn_instance_worker(
// Get the module, if any, and get any setup errors from the worker.
let res: Result<ModuleCommon, anyhow::Error> = result_rx.await.expect("should have a sender");
res.map(|opt_mc| {
let inst = JsInstance {
tx: request_tx,
trapped,
};
let inst = W::make_instance(request_tx, trapped);
(opt_mc, inst)
})
}