mirror of
https://github.com/clockworklabs/SpacetimeDB.git
synced 2026-05-06 07:26:43 -04:00
Replace JsInstance pool with single worker and FIFO queue (#4663)
# Description of Changes Before this change, JS reducer requests borrowed a `JsInstance` from a pool. If no idle instance was available, we created another instance, which meant another V8 worker thread. Under load, this meant reducers bouncing across multiple OS threads. After this change, JS reducers go through a single long-lived `JsInstance` fed by a FIFO queue which results in much better cache locality. More accurately, each module now allocates a single OS thread, on which reducers (and most operations) run. Modules do not share workers/threads. And modules do not create multiple threads for running reducers. Note, the original instance pool is still used for procedures. It should probably be bounded, but I didn't make any changes to it. It's also used for executing views during initial subscription to avoid a reentrancy deadlock. The latter should be fixed and moved over to the JS worker thread at some point. # API and ABI breaking changes N/A # Expected complexity level and risk 4 # Testing ``` NODE_OPTIONS="--max-old-space-size=8192" \ MAX_INFLIGHT_PER_WORKER=512 \ BENCH_PRECOMPUTED_TRANSFER_PAIRS=1000000 \ pnpm bench test-1 --seconds 10 --concurrency 50 --alpha 1.5 --connectors spacetimedb ``` ``` 50K TPS -> 85K TPS on m2 mac ```
This commit is contained in:
@@ -98,6 +98,7 @@ fn map_reducer_error(e: ReducerCallError, reducer: &str) -> (StatusCode, String)
|
||||
log::debug!("Attempt to call non-existent reducer {reducer}");
|
||||
StatusCode::NOT_FOUND
|
||||
}
|
||||
ReducerCallError::WorkerError(_) => StatusCode::INTERNAL_SERVER_ERROR,
|
||||
ReducerCallError::LifecycleReducer(lifecycle) => {
|
||||
log::debug!("Attempt to call {lifecycle:?} lifecycle reducer {reducer}");
|
||||
StatusCode::BAD_REQUEST
|
||||
|
||||
@@ -11,7 +11,7 @@ use crate::error::DBError;
|
||||
use crate::estimation::{check_row_limit, estimate_rows_scanned};
|
||||
use crate::hash::Hash;
|
||||
use crate::host::host_controller::CallProcedureReturn;
|
||||
use crate::host::scheduler::{CallScheduledFunctionResult, ScheduledFunctionParams};
|
||||
use crate::host::scheduler::{CallScheduledFunctionError, CallScheduledFunctionResult, ScheduledFunctionParams};
|
||||
use crate::host::v8::JsInstance;
|
||||
pub use crate::host::wasm_common::module_host_actor::{InstanceCommon, WasmInstance};
|
||||
use crate::host::wasmtime::ModuleInstance;
|
||||
@@ -357,7 +357,9 @@ struct WasmtimeModuleHost {
|
||||
}
|
||||
|
||||
struct V8ModuleHost {
|
||||
instance_manager: ModuleInstanceManager<super::v8::JsModule>,
|
||||
module: super::v8::JsModule,
|
||||
instance_lane: super::v8::JsInstanceLane,
|
||||
procedure_instances: ModuleInstanceManager<super::v8::JsModule>,
|
||||
}
|
||||
|
||||
/// A module; used as a bound on `InstanceManager`.
|
||||
@@ -813,7 +815,7 @@ impl CreateInstanceTimeMetric {
|
||||
}
|
||||
|
||||
impl<M: GenericModule> ModuleInstanceManager<M> {
|
||||
fn new(module: M, init_inst: M::Instance, database_identity: Identity) -> Self {
|
||||
fn new(module: M, init_inst: Option<M::Instance>, database_identity: Identity) -> Self {
|
||||
let host_type = module.host_type();
|
||||
let module_instances_metric = ModuleInstancesMetric {
|
||||
metric: WORKER_METRICS
|
||||
@@ -832,9 +834,8 @@ impl<M: GenericModule> ModuleInstanceManager<M> {
|
||||
database_identity,
|
||||
};
|
||||
|
||||
// Add the first instance.
|
||||
let mut instances = VecDeque::new();
|
||||
instances.push_front(init_inst);
|
||||
instances.extend(init_inst);
|
||||
|
||||
Self {
|
||||
instances: Mutex::new(instances),
|
||||
@@ -937,6 +938,8 @@ pub enum ReducerCallError {
|
||||
Args(#[from] InvalidReducerArguments),
|
||||
#[error(transparent)]
|
||||
NoSuchModule(#[from] NoSuchModule),
|
||||
#[error("The reducer worker encountered a fatal error: {0}")]
|
||||
WorkerError(String),
|
||||
#[error("no such reducer")]
|
||||
NoSuchReducer,
|
||||
#[error("no such scheduled reducer")]
|
||||
@@ -1066,7 +1069,7 @@ impl ModuleHost {
|
||||
init_inst,
|
||||
} => {
|
||||
info = module.info();
|
||||
let instance_manager = ModuleInstanceManager::new(module, init_inst, database_identity);
|
||||
let instance_manager = ModuleInstanceManager::new(module, Some(init_inst), database_identity);
|
||||
Arc::new(ModuleHostInner::Wasm(WasmtimeModuleHost {
|
||||
executor,
|
||||
instance_manager,
|
||||
@@ -1074,8 +1077,13 @@ impl ModuleHost {
|
||||
}
|
||||
ModuleWithInstance::Js { module, init_inst } => {
|
||||
info = module.info();
|
||||
let instance_manager = ModuleInstanceManager::new(module, init_inst, database_identity);
|
||||
Arc::new(ModuleHostInner::Js(V8ModuleHost { instance_manager }))
|
||||
let instance_lane = super::v8::JsInstanceLane::new(module.clone(), init_inst);
|
||||
let procedure_instances = ModuleInstanceManager::new(module.clone(), None, database_identity);
|
||||
Arc::new(ModuleHostInner::Js(V8ModuleHost {
|
||||
module,
|
||||
instance_lane,
|
||||
procedure_instances,
|
||||
}))
|
||||
}
|
||||
};
|
||||
let on_panic = Arc::new(on_panic);
|
||||
@@ -1143,18 +1151,13 @@ impl ModuleHost {
|
||||
})
|
||||
.await
|
||||
}
|
||||
ModuleHostInner::Js(V8ModuleHost { instance_manager }) => {
|
||||
instance_manager
|
||||
.with_instance(async |mut inst| {
|
||||
let res = inst
|
||||
.run_on_thread(async move || {
|
||||
drop(timer_guard);
|
||||
f().await
|
||||
})
|
||||
.await;
|
||||
(res, inst)
|
||||
ModuleHostInner::Js(V8ModuleHost { instance_lane, .. }) => {
|
||||
instance_lane
|
||||
.run_on_thread(async move || {
|
||||
drop(timer_guard);
|
||||
f().await
|
||||
})
|
||||
.await
|
||||
.await?
|
||||
}
|
||||
})
|
||||
}
|
||||
@@ -1193,7 +1196,7 @@ impl ModuleHost {
|
||||
arg: A,
|
||||
timer: impl FnOnce(&str) -> Guard,
|
||||
work_wasm: impl AsyncFnOnce(Guard, &SingleCoreExecutor, Box<ModuleInstance>, A) -> (R, Box<ModuleInstance>),
|
||||
work_js: impl AsyncFnOnce(Guard, &mut JsInstance, A) -> R,
|
||||
work_js: impl AsyncFnOnce(Guard, &super::v8::JsInstanceLane, A) -> R,
|
||||
) -> Result<R, NoSuchModule> {
|
||||
self.guard_closed()?;
|
||||
let timer_guard = timer(label);
|
||||
@@ -1220,11 +1223,7 @@ impl ModuleHost {
|
||||
.with_instance(async |inst| work_wasm(timer_guard, executor, inst, arg).await)
|
||||
.await
|
||||
}
|
||||
ModuleHostInner::Js(V8ModuleHost { instance_manager }) => {
|
||||
instance_manager
|
||||
.with_instance(async |mut inst| (work_js(timer_guard, &mut inst, arg).await, inst))
|
||||
.await
|
||||
}
|
||||
ModuleHostInner::Js(V8ModuleHost { instance_lane, .. }) => work_js(timer_guard, instance_lane, arg).await,
|
||||
})
|
||||
}
|
||||
|
||||
@@ -1237,7 +1236,7 @@ impl ModuleHost {
|
||||
label: &str,
|
||||
arg: A,
|
||||
wasm: impl AsyncFnOnce(A, &mut ModuleInstance) -> R + Send + 'static,
|
||||
js: impl AsyncFnOnce(A, &mut JsInstance) -> R,
|
||||
js: impl AsyncFnOnce(A, &super::v8::JsInstanceLane) -> R,
|
||||
) -> Result<R, NoSuchModule>
|
||||
where
|
||||
R: Send + 'static,
|
||||
@@ -1262,6 +1261,7 @@ impl ModuleHost {
|
||||
.await
|
||||
},
|
||||
async move |timer_guard, inst, arg| {
|
||||
super::v8::assert_not_on_js_module_thread(label);
|
||||
drop(timer_guard);
|
||||
js(arg, inst).await
|
||||
},
|
||||
@@ -1269,6 +1269,70 @@ impl ModuleHost {
|
||||
.await
|
||||
}
|
||||
|
||||
/// Run a function for this module using pooled instances.
|
||||
///
|
||||
/// For WASM, this is identical to [`Self::call`].
|
||||
/// For V8/JS, this uses the pooled procedure instances instead of the
|
||||
/// single instance lane.
|
||||
async fn call_pooled<A, R>(
|
||||
&self,
|
||||
label: &str,
|
||||
arg: A,
|
||||
wasm: impl AsyncFnOnce(A, &mut ModuleInstance) -> R + Send + 'static,
|
||||
js: impl AsyncFnOnce(A, &JsInstance) -> R,
|
||||
) -> Result<R, NoSuchModule>
|
||||
where
|
||||
R: Send + 'static,
|
||||
A: Send + 'static,
|
||||
{
|
||||
self.guard_closed()?;
|
||||
let timer_guard = self.start_call_timer(label);
|
||||
|
||||
scopeguard::defer_on_unwind!({
|
||||
log::warn!("pooled operation {label} panicked");
|
||||
(self.on_panic)();
|
||||
});
|
||||
|
||||
Ok(match &*self.inner {
|
||||
ModuleHostInner::Wasm(WasmtimeModuleHost {
|
||||
executor,
|
||||
instance_manager,
|
||||
}) => {
|
||||
instance_manager
|
||||
.with_instance(async |mut inst| {
|
||||
executor
|
||||
.run_job(async move || {
|
||||
drop(timer_guard);
|
||||
(wasm(arg, &mut inst).await, inst)
|
||||
})
|
||||
.await
|
||||
})
|
||||
.await
|
||||
}
|
||||
ModuleHostInner::Js(V8ModuleHost {
|
||||
procedure_instances, ..
|
||||
}) => {
|
||||
procedure_instances
|
||||
.with_instance(async |inst| {
|
||||
drop(timer_guard);
|
||||
let res = js(arg, &inst).await;
|
||||
(res, inst)
|
||||
})
|
||||
.await
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
async fn call_view_command(&self, label: &str, cmd: ViewCommand) -> Result<ViewCommandResult, ViewCallError> {
|
||||
self.call_pooled(
|
||||
label,
|
||||
cmd,
|
||||
async |cmd, inst| Ok::<_, ViewCallError>(inst.call_view(cmd)),
|
||||
async |cmd, inst| Ok::<_, ViewCallError>(inst.call_view(cmd).await),
|
||||
)
|
||||
.await?
|
||||
}
|
||||
|
||||
pub async fn disconnect_client(&self, client_id: ClientActorId) {
|
||||
log::trace!("disconnecting client {client_id}");
|
||||
if let Err(e) = self
|
||||
@@ -1536,14 +1600,13 @@ impl ModuleHost {
|
||||
args,
|
||||
};
|
||||
|
||||
Ok(self
|
||||
.call(
|
||||
&reducer_def.name,
|
||||
call_reducer_params,
|
||||
async |p, inst| inst.call_reducer(p),
|
||||
async |p, inst| inst.call_reducer(p).await,
|
||||
)
|
||||
.await?)
|
||||
self.call(
|
||||
&reducer_def.name,
|
||||
call_reducer_params,
|
||||
async |p, inst| Ok(inst.call_reducer(p)),
|
||||
async |p, inst| inst.call_reducer(p).await,
|
||||
)
|
||||
.await?
|
||||
}
|
||||
|
||||
pub async fn call_reducer(
|
||||
@@ -1611,12 +1674,7 @@ impl ModuleHost {
|
||||
};
|
||||
|
||||
let res = self
|
||||
.call(
|
||||
"call_view_add_single_subscription",
|
||||
cmd,
|
||||
async |cmd, inst| inst.call_view(cmd),
|
||||
async |cmd, inst| inst.call_view(cmd).await,
|
||||
)
|
||||
.call_view_command("call_view_add_single_subscription", cmd)
|
||||
.await
|
||||
//TODO: handle error better
|
||||
.map_err(|e| DBError::Other(anyhow::anyhow!(e)))?;
|
||||
@@ -1644,12 +1702,7 @@ impl ModuleHost {
|
||||
};
|
||||
|
||||
let res = self
|
||||
.call(
|
||||
"call_view_add_multi_subscription",
|
||||
cmd,
|
||||
async |cmd, inst| inst.call_view(cmd),
|
||||
async |cmd, inst| inst.call_view(cmd).await,
|
||||
)
|
||||
.call_view_command("call_view_add_multi_subscription", cmd)
|
||||
.await
|
||||
//TODO: handle error better
|
||||
.map_err(|e| DBError::Other(anyhow::anyhow!(e)))?;
|
||||
@@ -1677,12 +1730,7 @@ impl ModuleHost {
|
||||
};
|
||||
|
||||
let res = self
|
||||
.call(
|
||||
"call_view_remove_v2_subscription",
|
||||
cmd,
|
||||
async |cmd, inst| inst.call_view(cmd),
|
||||
async |cmd, inst| inst.call_view(cmd).await,
|
||||
)
|
||||
.call_view_command("call_view_remove_v2_subscription", cmd)
|
||||
.await
|
||||
.map_err(|e| DBError::Other(anyhow::anyhow!(e)))?;
|
||||
|
||||
@@ -1709,12 +1757,7 @@ impl ModuleHost {
|
||||
};
|
||||
|
||||
let res = self
|
||||
.call(
|
||||
"call_view_add_multi_subscription",
|
||||
cmd,
|
||||
async |cmd, inst| inst.call_view(cmd),
|
||||
async |cmd, inst| inst.call_view(cmd).await,
|
||||
)
|
||||
.call_view_command("call_view_add_multi_subscription", cmd)
|
||||
.await
|
||||
//TODO: handle error better
|
||||
.map_err(|e| DBError::Other(anyhow::anyhow!(e)))?;
|
||||
@@ -1742,12 +1785,7 @@ impl ModuleHost {
|
||||
};
|
||||
|
||||
let res = self
|
||||
.call(
|
||||
"call_view_add_legacy_subscription",
|
||||
cmd,
|
||||
async |cmd, inst| inst.call_view(cmd),
|
||||
async |cmd, inst| inst.call_view(cmd).await,
|
||||
)
|
||||
.call_view_command("call_view_add_legacy_subscription", cmd)
|
||||
.await
|
||||
//TODO: handle error better
|
||||
.map_err(|e| DBError::Other(anyhow::anyhow!(e)))?;
|
||||
@@ -1776,12 +1814,7 @@ impl ModuleHost {
|
||||
};
|
||||
|
||||
let res = self
|
||||
.call(
|
||||
"call_view_sql",
|
||||
cmd,
|
||||
async |cmd, inst| inst.call_view(cmd),
|
||||
async |cmd, inst| inst.call_view(cmd).await,
|
||||
)
|
||||
.call_view_command("call_view_sql", cmd)
|
||||
.await
|
||||
//TODO: handle error better
|
||||
.map_err(|e| DBError::Other(anyhow::anyhow!(e)))?;
|
||||
@@ -1885,7 +1918,7 @@ impl ModuleHost {
|
||||
name: &str,
|
||||
params: CallProcedureParams,
|
||||
) -> Result<CallProcedureReturn, NoSuchModule> {
|
||||
self.call(
|
||||
self.call_pooled(
|
||||
name,
|
||||
params,
|
||||
async move |params, inst| inst.call_procedure(params).await,
|
||||
@@ -1897,14 +1930,14 @@ impl ModuleHost {
|
||||
pub(super) async fn call_scheduled_function(
|
||||
&self,
|
||||
params: ScheduledFunctionParams,
|
||||
) -> Result<CallScheduledFunctionResult, NoSuchModule> {
|
||||
self.call(
|
||||
) -> Result<CallScheduledFunctionResult, CallScheduledFunctionError> {
|
||||
self.call_pooled(
|
||||
"unknown scheduled function",
|
||||
params,
|
||||
async move |params, inst| inst.call_scheduled_function(params).await,
|
||||
async move |params, inst| inst.call_scheduled_function(params).await,
|
||||
async move |params, inst| Ok(inst.call_scheduled_function(params).await),
|
||||
async move |params, inst| Ok(inst.call_scheduled_function(params).await),
|
||||
)
|
||||
.await
|
||||
.await?
|
||||
}
|
||||
|
||||
/// Materializes the views return by the `view_collector`, if not already materialized,
|
||||
@@ -2467,14 +2500,14 @@ impl ModuleHost {
|
||||
pub(crate) fn replica_ctx(&self) -> &ReplicaContext {
|
||||
match &*self.inner {
|
||||
ModuleHostInner::Wasm(wasm) => wasm.instance_manager.module.replica_ctx(),
|
||||
ModuleHostInner::Js(js) => js.instance_manager.module.replica_ctx(),
|
||||
ModuleHostInner::Js(js) => js.module.replica_ctx(),
|
||||
}
|
||||
}
|
||||
|
||||
fn scheduler(&self) -> &Scheduler {
|
||||
match &*self.inner {
|
||||
ModuleHostInner::Wasm(wasm) => wasm.instance_manager.module.scheduler(),
|
||||
ModuleHostInner::Js(js) => js.instance_manager.module.scheduler(),
|
||||
ModuleHostInner::Js(js) => js.module.scheduler(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -268,13 +268,21 @@ struct SchedulerActor {
|
||||
module_host: WeakModuleHost,
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
enum QueueItem {
|
||||
Id { id: ScheduledFunctionId, at: Timestamp },
|
||||
VolatileNonatomicImmediate { function_name: String, args: FunctionArgs },
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub(crate) struct ScheduledFunctionParams(QueueItem);
|
||||
|
||||
#[derive(thiserror::Error, Debug)]
|
||||
pub(crate) enum CallScheduledFunctionError {
|
||||
#[error(transparent)]
|
||||
NoSuchModule(#[from] NoSuchModule),
|
||||
}
|
||||
|
||||
#[cfg(target_pointer_width = "64")]
|
||||
spacetimedb_table::static_assert_size!(QueueItem, 64);
|
||||
|
||||
@@ -320,8 +328,8 @@ impl SchedulerActor {
|
||||
|
||||
async fn handle_queued(&mut self, id: Expired<QueueItem>) {
|
||||
let item = id.into_inner();
|
||||
let id: Option<ScheduledFunctionId> = match item {
|
||||
QueueItem::Id { id, .. } => Some(id),
|
||||
let id: Option<ScheduledFunctionId> = match &item {
|
||||
QueueItem::Id { id, .. } => Some(*id),
|
||||
QueueItem::VolatileNonatomicImmediate { .. } => None,
|
||||
};
|
||||
if let Some(id) = id {
|
||||
@@ -337,7 +345,7 @@ impl SchedulerActor {
|
||||
match result {
|
||||
// If the module already exited, leave the `ScheduledFunction` in
|
||||
// the database for when the module restarts.
|
||||
Err(NoSuchModule) => {}
|
||||
Err(CallScheduledFunctionError::NoSuchModule(_)) => {}
|
||||
Ok(CallScheduledFunctionResult { reschedule: None }) => {
|
||||
// nothing to do
|
||||
}
|
||||
|
||||
+521
-179
@@ -16,7 +16,7 @@ use crate::client::ClientActorId;
|
||||
use crate::host::host_controller::CallProcedureReturn;
|
||||
use crate::host::instance_env::{ChunkPool, InstanceEnv, TxSlot};
|
||||
use crate::host::module_host::{
|
||||
call_identity_connected, init_database, ClientConnectedError, ViewCommand, ViewCommandResult,
|
||||
call_identity_connected, init_database, ClientConnectedError, ViewCallError, ViewCommand, ViewCommandResult,
|
||||
};
|
||||
use crate::host::scheduler::{CallScheduledFunctionResult, ScheduledFunctionParams};
|
||||
use crate::host::wasm_common::instrumentation::CallTimes;
|
||||
@@ -33,10 +33,10 @@ use crate::subscription::module_subscription_manager::TransactionOffset;
|
||||
use crate::util::jobs::{AllocatedJobCore, CorePinner, LoadBalanceOnDropGuard};
|
||||
use core::any::type_name;
|
||||
use core::str;
|
||||
use enum_as_inner::EnumAsInner;
|
||||
use futures::future::LocalBoxFuture;
|
||||
use futures::FutureExt;
|
||||
use itertools::Either;
|
||||
use parking_lot::RwLock;
|
||||
use spacetimedb_auth::identity::ConnectionAuthCtx;
|
||||
use spacetimedb_client_api_messages::energy::FunctionBudget;
|
||||
use spacetimedb_datastore::locking_tx_datastore::FuncCallType;
|
||||
@@ -46,10 +46,12 @@ use spacetimedb_schema::auto_migrate::MigrationPolicy;
|
||||
use spacetimedb_schema::def::ModuleDef;
|
||||
use spacetimedb_schema::identifier::Identifier;
|
||||
use spacetimedb_table::static_assert_size;
|
||||
use std::cell::Cell;
|
||||
use std::panic::AssertUnwindSafe;
|
||||
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
|
||||
use std::sync::{Arc, LazyLock};
|
||||
use std::time::Instant;
|
||||
use tokio::sync::oneshot;
|
||||
use tokio::sync::{oneshot, Mutex as AsyncMutex};
|
||||
use tracing::Instrument;
|
||||
use v8::script_compiler::{compile_module, Source};
|
||||
use v8::{
|
||||
@@ -93,6 +95,53 @@ impl V8Runtime {
|
||||
}
|
||||
|
||||
static V8_RUNTIME_GLOBAL: LazyLock<V8RuntimeInner> = LazyLock::new(V8RuntimeInner::init);
|
||||
static NEXT_JS_INSTANCE_ID: AtomicU64 = AtomicU64::new(1);
|
||||
|
||||
thread_local! {
|
||||
// Note, `on_module_thread` runs host closures on a single JS module thread.
|
||||
// Enqueuing more JS module-thread work from one of those closures waits on the
|
||||
// same worker thread that is already busy running the current closure.
|
||||
// And this deadlocks.
|
||||
static ON_JS_MODULE_THREAD: Cell<bool> = const { Cell::new(false) };
|
||||
}
|
||||
|
||||
struct EnteredJsModuleThread;
|
||||
|
||||
impl EnteredJsModuleThread {
|
||||
fn new() -> Self {
|
||||
ON_JS_MODULE_THREAD.with(|entered| {
|
||||
assert!(
|
||||
!entered.get(),
|
||||
"reentrancy into the JS module thread; this would deadlock. \
|
||||
Do not enqueue onto this worker from inside `on_module_thread` work."
|
||||
);
|
||||
entered.set(true);
|
||||
});
|
||||
Self
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for EnteredJsModuleThread {
|
||||
fn drop(&mut self) {
|
||||
ON_JS_MODULE_THREAD.with(|entered| {
|
||||
debug_assert!(
|
||||
entered.get(),
|
||||
"JS module thread marker should only be cleared after entry"
|
||||
);
|
||||
entered.set(false);
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn assert_not_on_js_module_thread(label: &str) {
|
||||
ON_JS_MODULE_THREAD.with(|entered| {
|
||||
assert!(
|
||||
!entered.get(),
|
||||
"{label} attempted to re-enter the JS module thread from code already \
|
||||
running on that thread; this would deadlock"
|
||||
);
|
||||
});
|
||||
}
|
||||
|
||||
/// The actual V8 runtime, with initialization of V8.
|
||||
struct V8RuntimeInner {
|
||||
@@ -290,52 +339,52 @@ impl JsInstanceEnv {
|
||||
/// The actual work happens in a worker thread,
|
||||
/// which the instance communicates with through channels.
|
||||
///
|
||||
/// When the instance is dropped, the channels will hang up,
|
||||
/// which will cause the worker's loop to terminate
|
||||
/// and cleanup the isolate and friends.
|
||||
/// This handle is cloneable and shared by callers. Requests are queued FIFO
|
||||
/// on the worker thread so the next reducer can start immediately after the
|
||||
/// previous one finishes, without waiting for an outer task to hand the
|
||||
/// instance back.
|
||||
///
|
||||
/// When the last handle is dropped, the channels will hang up,
|
||||
/// which will cause the worker's loop to terminate and cleanup the isolate
|
||||
/// and friends.
|
||||
#[derive(Clone)]
|
||||
pub struct JsInstance {
|
||||
/// Stable identifier for the underlying worker generation.
|
||||
///
|
||||
/// All clones of the same handle share the same `id`. The instance lane uses
|
||||
/// it to tell whether the currently active worker has already been replaced
|
||||
/// after a trap or disconnect.
|
||||
id: u64,
|
||||
request_tx: flume::Sender<JsWorkerRequest>,
|
||||
reply_rx: flume::Receiver<(JsWorkerReply, bool)>,
|
||||
trapped: bool,
|
||||
trapped: Arc<AtomicBool>,
|
||||
}
|
||||
|
||||
impl JsInstance {
|
||||
fn id(&self) -> u64 {
|
||||
self.id
|
||||
}
|
||||
|
||||
pub fn trapped(&self) -> bool {
|
||||
self.trapped
|
||||
self.trapped.load(Ordering::Relaxed)
|
||||
}
|
||||
|
||||
/// Send a request to the worker and wait for a reply.
|
||||
async fn send_recv<T>(
|
||||
&mut self,
|
||||
extract: impl FnOnce(JsWorkerReply) -> Result<T, JsWorkerReply>,
|
||||
request: JsWorkerRequest,
|
||||
) -> T {
|
||||
// Send the request.
|
||||
async fn send_request<T>(
|
||||
&self,
|
||||
request: impl FnOnce(JsReplyTx<T>) -> JsWorkerRequest,
|
||||
) -> Result<T, WorkerDisconnected> {
|
||||
let (reply_tx, reply_rx) = oneshot::channel();
|
||||
self.request_tx
|
||||
.send_async(request)
|
||||
.send_async(request(reply_tx))
|
||||
.await
|
||||
.expect("worker's `request_rx` should be live as `JsInstance::drop` hasn't happened");
|
||||
|
||||
// Wait for the response.
|
||||
let (reply, trapped) = self
|
||||
.reply_rx
|
||||
.recv_async()
|
||||
.await
|
||||
.expect("worker's `reply_tx` should be live as `JsInstance::drop` hasn't happened");
|
||||
|
||||
self.trapped = trapped;
|
||||
|
||||
match extract(reply) {
|
||||
Err(err) => unreachable!("should have received {} but got {err:?}", type_name::<T>()),
|
||||
Ok(reply) => reply,
|
||||
.map_err(|_| WorkerDisconnected)?;
|
||||
let JsWorkerReply { value, trapped } = reply_rx.await.map_err(|_| WorkerDisconnected)?;
|
||||
if trapped {
|
||||
self.trapped.store(true, Ordering::Relaxed);
|
||||
}
|
||||
Ok(value)
|
||||
}
|
||||
|
||||
/// Run the given function on the worker thread.
|
||||
///
|
||||
/// This method, unlike the others, does not expect a response on the
|
||||
/// `reply_rx` channel, since the return value `R` could be of any type.
|
||||
pub async fn run_on_thread<F, R>(&mut self, f: F) -> R
|
||||
pub async fn run_on_thread<F, R>(&self, f: F) -> R
|
||||
where
|
||||
F: AsyncFnOnce() -> R + Send + 'static,
|
||||
R: Send + 'static,
|
||||
@@ -343,145 +392,137 @@ impl JsInstance {
|
||||
let span = tracing::Span::current();
|
||||
let (tx, rx) = oneshot::channel();
|
||||
|
||||
let request = JsWorkerRequest::RunFunction(Box::new(move || {
|
||||
async move {
|
||||
let result = AssertUnwindSafe(f().instrument(span)).catch_unwind().await;
|
||||
if let Err(Err(_panic)) = tx.send(result) {
|
||||
tracing::warn!("uncaught panic on `SingleCoreExecutor`")
|
||||
}
|
||||
}
|
||||
.boxed_local()
|
||||
}));
|
||||
|
||||
self.request_tx
|
||||
.send_async(request)
|
||||
.send_async(JsWorkerRequest::RunFunction(Box::new(move || {
|
||||
async move {
|
||||
let result = AssertUnwindSafe(f().instrument(span)).catch_unwind().await;
|
||||
if let Err(Err(_panic)) = tx.send(result) {
|
||||
tracing::warn!("uncaught panic on `SingleCoreExecutor`")
|
||||
}
|
||||
}
|
||||
.boxed_local()
|
||||
})))
|
||||
.await
|
||||
.expect("worker's `request_rx` should be live as `JsInstance::drop` hasn't happened");
|
||||
.unwrap_or_else(|_| panic!("worker should stay live while handling {}", type_name::<R>()));
|
||||
|
||||
match rx.await.unwrap() {
|
||||
match rx
|
||||
.await
|
||||
.unwrap_or_else(|_| panic!("worker should stay live while handling {}", type_name::<R>()))
|
||||
{
|
||||
Ok(r) => r,
|
||||
Err(e) => std::panic::resume_unwind(e),
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn update_database(
|
||||
&mut self,
|
||||
&self,
|
||||
program: Program,
|
||||
old_module_info: Arc<ModuleInfo>,
|
||||
policy: MigrationPolicy,
|
||||
) -> anyhow::Result<UpdateDatabaseResult> {
|
||||
self.send_recv(
|
||||
JsWorkerReply::into_update_database,
|
||||
JsWorkerRequest::UpdateDatabase {
|
||||
program,
|
||||
old_module_info,
|
||||
policy,
|
||||
},
|
||||
)
|
||||
self.send_request(|reply_tx| JsWorkerRequest::UpdateDatabase {
|
||||
reply_tx,
|
||||
program,
|
||||
old_module_info,
|
||||
policy,
|
||||
})
|
||||
.await
|
||||
.unwrap_or_else(|_| panic!("worker should stay live while updating the database"))
|
||||
}
|
||||
|
||||
pub async fn call_reducer(&mut self, params: CallReducerParams) -> ReducerCallResult {
|
||||
self.send_recv(
|
||||
JsWorkerReply::into_call_reducer,
|
||||
JsWorkerRequest::CallReducer { params },
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
pub async fn clear_all_clients(&mut self) -> anyhow::Result<()> {
|
||||
self.send_recv(JsWorkerReply::into_clear_all_clients, JsWorkerRequest::ClearAllClients)
|
||||
pub async fn call_reducer(&self, params: CallReducerParams) -> ReducerCallResult {
|
||||
self.send_request(|reply_tx| JsWorkerRequest::CallReducer { reply_tx, params })
|
||||
.await
|
||||
.unwrap_or_else(|_| panic!("worker should stay live while calling a reducer"))
|
||||
}
|
||||
|
||||
pub async fn clear_all_clients(&self) -> anyhow::Result<()> {
|
||||
self.send_request(JsWorkerRequest::ClearAllClients)
|
||||
.await
|
||||
.unwrap_or_else(|_| panic!("worker should stay live while clearing clients"))
|
||||
}
|
||||
|
||||
pub async fn call_identity_connected(
|
||||
&mut self,
|
||||
&self,
|
||||
caller_auth: ConnectionAuthCtx,
|
||||
caller_connection_id: ConnectionId,
|
||||
) -> Result<(), ClientConnectedError> {
|
||||
self.send_recv(
|
||||
JsWorkerReply::into_call_identity_connected,
|
||||
JsWorkerRequest::CallIdentityConnected(caller_auth, caller_connection_id),
|
||||
)
|
||||
self.send_request(|reply_tx| JsWorkerRequest::CallIdentityConnected {
|
||||
reply_tx,
|
||||
caller_auth,
|
||||
caller_connection_id,
|
||||
})
|
||||
.await
|
||||
.unwrap_or_else(|_| panic!("worker should stay live while running client_connected"))
|
||||
}
|
||||
|
||||
pub async fn call_identity_disconnected(
|
||||
&mut self,
|
||||
&self,
|
||||
caller_identity: Identity,
|
||||
caller_connection_id: ConnectionId,
|
||||
) -> Result<(), ReducerCallError> {
|
||||
self.send_recv(
|
||||
JsWorkerReply::into_call_identity_disconnected,
|
||||
JsWorkerRequest::CallIdentityDisconnected(caller_identity, caller_connection_id),
|
||||
)
|
||||
self.send_request(|reply_tx| JsWorkerRequest::CallIdentityDisconnected {
|
||||
reply_tx,
|
||||
caller_identity,
|
||||
caller_connection_id,
|
||||
})
|
||||
.await
|
||||
.unwrap_or_else(|_| panic!("worker should stay live while running client_disconnected"))
|
||||
}
|
||||
|
||||
pub async fn disconnect_client(&mut self, client_id: ClientActorId) -> Result<(), ReducerCallError> {
|
||||
self.send_recv(
|
||||
JsWorkerReply::into_disconnect_client,
|
||||
JsWorkerRequest::DisconnectClient(client_id),
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
pub async fn init_database(&mut self, program: Program) -> anyhow::Result<Option<ReducerCallResult>> {
|
||||
*self
|
||||
.send_recv(
|
||||
JsWorkerReply::into_init_database,
|
||||
JsWorkerRequest::InitDatabase(program),
|
||||
)
|
||||
pub async fn disconnect_client(&self, client_id: ClientActorId) -> Result<(), ReducerCallError> {
|
||||
self.send_request(|reply_tx| JsWorkerRequest::DisconnectClient { reply_tx, client_id })
|
||||
.await
|
||||
.unwrap_or_else(|_| panic!("worker should stay live while disconnecting a client"))
|
||||
}
|
||||
|
||||
pub async fn call_procedure(&mut self, params: CallProcedureParams) -> CallProcedureReturn {
|
||||
*self
|
||||
.send_recv(
|
||||
JsWorkerReply::into_call_procedure,
|
||||
JsWorkerRequest::CallProcedure { params },
|
||||
)
|
||||
pub async fn init_database(&self, program: Program) -> anyhow::Result<Option<ReducerCallResult>> {
|
||||
self.send_request(|reply_tx| JsWorkerRequest::InitDatabase { reply_tx, program })
|
||||
.await
|
||||
.unwrap_or_else(|_| panic!("worker should stay live while initializing the database"))
|
||||
}
|
||||
|
||||
pub async fn call_view(&mut self, cmd: ViewCommand) -> ViewCommandResult {
|
||||
*self
|
||||
.send_recv(JsWorkerReply::into_call_view, JsWorkerRequest::CallView { cmd })
|
||||
pub async fn call_procedure(&self, params: CallProcedureParams) -> CallProcedureReturn {
|
||||
self.send_request(|reply_tx| JsWorkerRequest::CallProcedure { reply_tx, params })
|
||||
.await
|
||||
.unwrap_or_else(|_| panic!("worker should stay live while calling a procedure"))
|
||||
}
|
||||
|
||||
pub async fn call_view(&self, cmd: ViewCommand) -> ViewCommandResult {
|
||||
self.send_request(|reply_tx| JsWorkerRequest::CallView { reply_tx, cmd })
|
||||
.await
|
||||
.unwrap_or_else(|_| panic!("worker should stay live while calling a view"))
|
||||
}
|
||||
|
||||
pub(in crate::host) async fn call_scheduled_function(
|
||||
&mut self,
|
||||
&self,
|
||||
params: ScheduledFunctionParams,
|
||||
) -> CallScheduledFunctionResult {
|
||||
self.send_recv(
|
||||
JsWorkerReply::into_call_scheduled_function,
|
||||
JsWorkerRequest::CallScheduledFunction(params),
|
||||
)
|
||||
.await
|
||||
self.send_request(|reply_tx| JsWorkerRequest::CallScheduledFunction { reply_tx, params })
|
||||
.await
|
||||
.unwrap_or_else(|_| panic!("worker should stay live while calling a scheduled function"))
|
||||
}
|
||||
}
|
||||
|
||||
/// A reply from the worker in [`spawn_instance_worker`].
|
||||
#[derive(EnumAsInner, Debug)]
|
||||
enum JsWorkerReply {
|
||||
UpdateDatabase(anyhow::Result<UpdateDatabaseResult>),
|
||||
CallReducer(ReducerCallResult),
|
||||
CallView(Box<ViewCommandResult>),
|
||||
CallProcedure(Box<CallProcedureReturn>),
|
||||
ClearAllClients(anyhow::Result<()>),
|
||||
CallIdentityConnected(Result<(), ClientConnectedError>),
|
||||
CallIdentityDisconnected(Result<(), ReducerCallError>),
|
||||
DisconnectClient(Result<(), ReducerCallError>),
|
||||
InitDatabase(Box<anyhow::Result<Option<ReducerCallResult>>>),
|
||||
CallScheduledFunction(CallScheduledFunctionResult),
|
||||
#[derive(Clone, Copy, Debug)]
|
||||
struct WorkerDisconnected;
|
||||
|
||||
fn instance_lane_worker_error(label: &'static str) -> String {
|
||||
format!("instance lane worker exited while handling {label}")
|
||||
}
|
||||
|
||||
static_assert_size!(JsWorkerReply, 48);
|
||||
struct JsWorkerReply<T> {
|
||||
value: T,
|
||||
trapped: bool,
|
||||
}
|
||||
|
||||
/// A request for the worker in [`spawn_instance_worker`].
|
||||
// We care about optimizing for `CallReducer` as it happens frequently,
|
||||
// so we don't want to box anything in it.
|
||||
type JsReplyTx<T> = oneshot::Sender<JsWorkerReply<T>>;
|
||||
|
||||
/// Requests sent to the dedicated JS worker thread.
|
||||
///
|
||||
/// Most variants carry a `reply_tx` because the worker thread owns the isolate,
|
||||
/// executes the request there, and then has to send both the typed result and
|
||||
/// the worker's trapped-bit back to the async caller.
|
||||
enum JsWorkerRequest {
|
||||
/// See [`JsInstance::run_on_thread`].
|
||||
///
|
||||
@@ -489,34 +530,327 @@ enum JsWorkerRequest {
|
||||
RunFunction(Box<dyn FnOnce() -> LocalBoxFuture<'static, ()> + Send>),
|
||||
/// See [`JsInstance::update_database`].
|
||||
UpdateDatabase {
|
||||
reply_tx: JsReplyTx<anyhow::Result<UpdateDatabaseResult>>,
|
||||
program: Program,
|
||||
old_module_info: Arc<ModuleInfo>,
|
||||
policy: MigrationPolicy,
|
||||
},
|
||||
/// See [`JsInstance::call_reducer`].
|
||||
CallReducer { params: CallReducerParams },
|
||||
CallReducer {
|
||||
reply_tx: JsReplyTx<ReducerCallResult>,
|
||||
params: CallReducerParams,
|
||||
},
|
||||
/// See [`JsInstance::call_view`].
|
||||
CallView { cmd: ViewCommand },
|
||||
CallView {
|
||||
reply_tx: JsReplyTx<ViewCommandResult>,
|
||||
cmd: ViewCommand,
|
||||
},
|
||||
/// See [`JsInstance::call_procedure`].
|
||||
CallProcedure { params: CallProcedureParams },
|
||||
CallProcedure {
|
||||
reply_tx: JsReplyTx<CallProcedureReturn>,
|
||||
params: CallProcedureParams,
|
||||
},
|
||||
/// See [`JsInstance::clear_all_clients`].
|
||||
ClearAllClients,
|
||||
ClearAllClients(JsReplyTx<anyhow::Result<()>>),
|
||||
/// See [`JsInstance::call_identity_connected`].
|
||||
CallIdentityConnected(ConnectionAuthCtx, ConnectionId),
|
||||
CallIdentityConnected {
|
||||
reply_tx: JsReplyTx<Result<(), ClientConnectedError>>,
|
||||
caller_auth: ConnectionAuthCtx,
|
||||
caller_connection_id: ConnectionId,
|
||||
},
|
||||
/// See [`JsInstance::call_identity_disconnected`].
|
||||
CallIdentityDisconnected(Identity, ConnectionId),
|
||||
CallIdentityDisconnected {
|
||||
reply_tx: JsReplyTx<Result<(), ReducerCallError>>,
|
||||
caller_identity: Identity,
|
||||
caller_connection_id: ConnectionId,
|
||||
},
|
||||
/// See [`JsInstance::disconnect_client`].
|
||||
DisconnectClient(ClientActorId),
|
||||
DisconnectClient {
|
||||
reply_tx: JsReplyTx<Result<(), ReducerCallError>>,
|
||||
client_id: ClientActorId,
|
||||
},
|
||||
/// See [`JsInstance::init_database`].
|
||||
InitDatabase(Program),
|
||||
InitDatabase {
|
||||
reply_tx: JsReplyTx<anyhow::Result<Option<ReducerCallResult>>>,
|
||||
program: Program,
|
||||
},
|
||||
/// See [`JsInstance::call_scheduled_function`].
|
||||
CallScheduledFunction(ScheduledFunctionParams),
|
||||
CallScheduledFunction {
|
||||
reply_tx: JsReplyTx<CallScheduledFunctionResult>,
|
||||
params: ScheduledFunctionParams,
|
||||
},
|
||||
}
|
||||
|
||||
// These two should be the same size (once core pinning PR lands).
|
||||
static_assert_size!(JsWorkerRequest, 192);
|
||||
static_assert_size!(CallReducerParams, 192);
|
||||
|
||||
fn send_worker_reply<T>(ctx: &str, reply_tx: JsReplyTx<T>, value: T, trapped: bool) {
|
||||
if reply_tx.send(JsWorkerReply { value, trapped }).is_err() {
|
||||
log::error!("should have receiver for `{ctx}` response");
|
||||
}
|
||||
}
|
||||
|
||||
struct JsInstanceLaneState {
|
||||
// Instance-lane calls stay on one active worker for locality. The hot path clones
|
||||
// this handle and feeds work straight into the worker-owned FIFO; trap
|
||||
// recovery very rarely swaps it out for a fresh instance.
|
||||
active: RwLock<JsInstance>,
|
||||
|
||||
// Replacement must be serialized because multiple callers can all observe
|
||||
// the same trapped worker and attempt recovery at once.
|
||||
//
|
||||
// This must be an async mutex rather than a blocking mutex because recovery
|
||||
// may need to call `create_instance().await`.
|
||||
//
|
||||
// This stays safe as long as these invariants hold:
|
||||
// - `replace_lock` is only for trap/disconnect recovery, never the hot path.
|
||||
// - no `parking_lot` guard is held across the `.await` in replacement.
|
||||
// - `create_instance()` must not call back into `JsInstanceLane` or try to
|
||||
// take `replace_lock`.
|
||||
replace_lock: AsyncMutex<()>,
|
||||
}
|
||||
|
||||
/// A single serialized execution lane for JS module work.
|
||||
///
|
||||
/// Callers share one active [`JsInstance`] so hot requests stay on the same
|
||||
/// worker thread for locality. The lane only steps in on the rare path, where
|
||||
/// a trap or disconnect forces that active worker to be replaced.
|
||||
#[derive(Clone)]
|
||||
pub struct JsInstanceLane {
|
||||
module: JsModule,
|
||||
state: Arc<JsInstanceLaneState>,
|
||||
}
|
||||
|
||||
impl JsInstanceLane {
|
||||
pub fn new(module: JsModule, init_inst: JsInstance) -> Self {
|
||||
Self {
|
||||
module,
|
||||
state: Arc::new(JsInstanceLaneState {
|
||||
active: RwLock::new(init_inst),
|
||||
replace_lock: AsyncMutex::new(()),
|
||||
}),
|
||||
}
|
||||
}
|
||||
|
||||
fn active_instance(&self) -> JsInstance {
|
||||
self.state.active.read().clone()
|
||||
}
|
||||
|
||||
async fn after_successful_call(&self, active: &JsInstance) {
|
||||
if active.trapped() {
|
||||
self.replace_active_if_current(active).await;
|
||||
}
|
||||
}
|
||||
|
||||
async fn replace_active_if_current(&self, trapped: &JsInstance) {
|
||||
// `replace_lock` intentionally serializes the rare recovery path. This
|
||||
// prevents a trap observed by many callers from spawning many replacement
|
||||
// workers and racing to install them.
|
||||
let _replace_guard = self.state.replace_lock.lock().await;
|
||||
|
||||
// The same trapped instance can be observed by multiple callers at once.
|
||||
// We only want the first one to do the swap; everybody else should notice
|
||||
// that the active handle already changed and get out of the way.
|
||||
if self.state.active.read().id() != trapped.id() {
|
||||
return;
|
||||
}
|
||||
|
||||
log::warn!("instance lane worker trapped; creating a fresh instance-lane worker");
|
||||
|
||||
// Keep the awaited instance creation outside of any `parking_lot` guard.
|
||||
// The only lock held across this await is `replace_lock`, which is why it
|
||||
// has to be async.
|
||||
let next = self.module.create_instance().await;
|
||||
*self.state.active.write() = next;
|
||||
}
|
||||
|
||||
/// Run an instance-lane operation exactly once.
|
||||
///
|
||||
/// If the worker disappears before replying, we replace it for future
|
||||
/// requests but surface the disconnect to the caller instead of retrying.
|
||||
/// This keeps instance-lane semantics closer to the old pooled-instance
|
||||
/// model now that the worker queue is a rendezvous channel.
|
||||
async fn run_once<R>(
|
||||
&self,
|
||||
label: &'static str,
|
||||
work: impl AsyncFnOnce(JsInstance) -> Result<R, WorkerDisconnected>,
|
||||
) -> Result<R, WorkerDisconnected> {
|
||||
assert_not_on_js_module_thread(label);
|
||||
|
||||
let active = self.active_instance();
|
||||
let result = work(active.clone()).await;
|
||||
match result {
|
||||
Ok(value) => {
|
||||
self.after_successful_call(&active).await;
|
||||
Ok(value)
|
||||
}
|
||||
Err(err) => {
|
||||
self.replace_active_if_current(&active).await;
|
||||
log::error!("instance-lane operation {label} lost its worker before replying");
|
||||
Err(err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Run an arbitrary closure on the instance-lane worker thread without replay.
|
||||
///
|
||||
/// This is non-replayable because the closure is opaque host code, not a
|
||||
/// cloneable request payload, and it may have already produced host-side
|
||||
/// effects before a worker disconnect is observed.
|
||||
pub async fn run_on_thread<F, R>(&self, f: F) -> anyhow::Result<R>
|
||||
where
|
||||
F: AsyncFnOnce() -> R + Send + 'static,
|
||||
R: Send + 'static,
|
||||
{
|
||||
let span = tracing::Span::current();
|
||||
self.run_once("run_on_thread", async move |inst| {
|
||||
let (tx, rx) = oneshot::channel();
|
||||
|
||||
inst.request_tx
|
||||
.send_async(JsWorkerRequest::RunFunction(Box::new(move || {
|
||||
async move {
|
||||
let _on_js_module_thread = EnteredJsModuleThread::new();
|
||||
let result = AssertUnwindSafe(f().instrument(span)).catch_unwind().await;
|
||||
if let Err(Err(_panic)) = tx.send(result) {
|
||||
tracing::warn!("uncaught panic on `SingleCoreExecutor`")
|
||||
}
|
||||
}
|
||||
.boxed_local()
|
||||
})))
|
||||
.await
|
||||
.map_err(|_| WorkerDisconnected)?;
|
||||
|
||||
Ok(match rx.await.map_err(|_| WorkerDisconnected)? {
|
||||
Ok(r) => r,
|
||||
Err(e) => std::panic::resume_unwind(e),
|
||||
})
|
||||
})
|
||||
.await
|
||||
.map_err(|_| anyhow::anyhow!("instance lane worker exited while running a non-replayable module-thread task"))
|
||||
}
|
||||
|
||||
/// Run a database update on the instance lane exactly once.
|
||||
///
|
||||
/// If the worker disappears before replying, we replace it for future
|
||||
/// requests and surface an internal host error to the caller.
|
||||
pub async fn update_database(
|
||||
&self,
|
||||
program: Program,
|
||||
old_module_info: Arc<ModuleInfo>,
|
||||
policy: MigrationPolicy,
|
||||
) -> anyhow::Result<UpdateDatabaseResult> {
|
||||
self.run_once("update_database", |inst: JsInstance| async move {
|
||||
inst.send_request(|reply_tx| JsWorkerRequest::UpdateDatabase {
|
||||
reply_tx,
|
||||
program,
|
||||
old_module_info,
|
||||
policy,
|
||||
})
|
||||
.await
|
||||
})
|
||||
.await
|
||||
.map_err(|_| anyhow::anyhow!(instance_lane_worker_error("update_database")))?
|
||||
}
|
||||
|
||||
/// Run a reducer on the instance lane exactly once.
|
||||
///
|
||||
/// A real reducer trap still returns the reducer's own outcome before the
|
||||
/// worker is replaced. If the worker disappears before any reply, we surface
|
||||
/// that as `ReducerCallError::WorkerError`.
|
||||
pub async fn call_reducer(&self, params: CallReducerParams) -> Result<ReducerCallResult, ReducerCallError> {
|
||||
self.run_once("call_reducer", |inst: JsInstance| async move {
|
||||
inst.send_request(|reply_tx| JsWorkerRequest::CallReducer { reply_tx, params })
|
||||
.await
|
||||
})
|
||||
.await
|
||||
.map_err(|_| ReducerCallError::WorkerError(instance_lane_worker_error("call_reducer")))
|
||||
}
|
||||
|
||||
/// Clear all instance-lane client state exactly once.
|
||||
pub async fn clear_all_clients(&self) -> anyhow::Result<()> {
|
||||
self.run_once("clear_all_clients", |inst: JsInstance| async move {
|
||||
inst.send_request(JsWorkerRequest::ClearAllClients).await
|
||||
})
|
||||
.await
|
||||
.map_err(|_| anyhow::anyhow!(instance_lane_worker_error("clear_all_clients")))?
|
||||
}
|
||||
|
||||
/// Run the `client_connected` lifecycle reducer exactly once.
|
||||
///
|
||||
/// If the worker disappears before replying, we replace it for future
|
||||
/// requests and reject the connection with `ReducerCallError::WorkerError`.
|
||||
pub async fn call_identity_connected(
|
||||
&self,
|
||||
caller_auth: ConnectionAuthCtx,
|
||||
caller_connection_id: ConnectionId,
|
||||
) -> Result<(), ClientConnectedError> {
|
||||
self.run_once("call_identity_connected", |inst: JsInstance| async move {
|
||||
inst.send_request(|reply_tx| JsWorkerRequest::CallIdentityConnected {
|
||||
reply_tx,
|
||||
caller_auth,
|
||||
caller_connection_id,
|
||||
})
|
||||
.await
|
||||
})
|
||||
.await
|
||||
.map_err(|_| {
|
||||
ClientConnectedError::from(ReducerCallError::WorkerError(instance_lane_worker_error(
|
||||
"call_identity_connected",
|
||||
)))
|
||||
})?
|
||||
}
|
||||
|
||||
/// Run the `client_disconnected` lifecycle reducer exactly once.
|
||||
pub async fn call_identity_disconnected(
|
||||
&self,
|
||||
caller_identity: Identity,
|
||||
caller_connection_id: ConnectionId,
|
||||
) -> Result<(), ReducerCallError> {
|
||||
self.run_once("call_identity_disconnected", |inst: JsInstance| async move {
|
||||
inst.send_request(|reply_tx| JsWorkerRequest::CallIdentityDisconnected {
|
||||
reply_tx,
|
||||
caller_identity,
|
||||
caller_connection_id,
|
||||
})
|
||||
.await
|
||||
})
|
||||
.await
|
||||
.map_err(|_| ReducerCallError::WorkerError(instance_lane_worker_error("call_identity_disconnected")))?
|
||||
}
|
||||
|
||||
/// Run disconnect cleanup on the instance lane exactly once.
|
||||
pub async fn disconnect_client(&self, client_id: ClientActorId) -> Result<(), ReducerCallError> {
|
||||
self.run_once("disconnect_client", |inst: JsInstance| async move {
|
||||
inst.send_request(|reply_tx| JsWorkerRequest::DisconnectClient { reply_tx, client_id })
|
||||
.await
|
||||
})
|
||||
.await
|
||||
.map_err(|_| ReducerCallError::WorkerError(instance_lane_worker_error("disconnect_client")))?
|
||||
}
|
||||
|
||||
/// Run reducer-style database initialization exactly once.
|
||||
pub async fn init_database(&self, program: Program) -> anyhow::Result<Option<ReducerCallResult>> {
|
||||
self.run_once("init_database", |inst: JsInstance| async move {
|
||||
inst.send_request(|reply_tx| JsWorkerRequest::InitDatabase { reply_tx, program })
|
||||
.await
|
||||
})
|
||||
.await
|
||||
.map_err(|_| anyhow::anyhow!(instance_lane_worker_error("init_database")))?
|
||||
}
|
||||
|
||||
/// Run a view/subscription command on the instance lane exactly once.
|
||||
///
|
||||
/// If the worker disappears before replying, we replace it for future
|
||||
/// requests and surface a `ViewCallError::InternalError`.
|
||||
pub async fn call_view(&self, cmd: ViewCommand) -> Result<ViewCommandResult, ViewCallError> {
|
||||
self.run_once("call_view", |inst: JsInstance| async move {
|
||||
inst.send_request(|reply_tx| JsWorkerRequest::CallView { reply_tx, cmd })
|
||||
.await
|
||||
})
|
||||
.await
|
||||
.map_err(|_| ViewCallError::InternalError(instance_lane_worker_error("call_view")))
|
||||
}
|
||||
}
|
||||
|
||||
/// Performs some of the startup work of [`spawn_instance_worker`].
|
||||
///
|
||||
/// NOTE(centril): in its own function due to lack of `try` blocks.
|
||||
@@ -575,13 +909,11 @@ async fn spawn_instance_worker(
|
||||
load_balance_guard: Arc<LoadBalanceOnDropGuard>,
|
||||
mut core_pinner: CorePinner,
|
||||
) -> anyhow::Result<(ModuleCommon, JsInstance)> {
|
||||
// Spawn channels for bidirectional communication between worker and instance.
|
||||
// The use-case is SPSC and all channels are rendezvous channels
|
||||
// where each `.send` blocks until it's received.
|
||||
// The Instance --Request-> Worker channel:
|
||||
// Spawn a rendezvous queue for requests to the worker.
|
||||
// Multiple callers can wait to hand work to the worker, but with
|
||||
// `bounded(0)` there is no buffered backlog inside the channel itself.
|
||||
// The worker still processes requests strictly one at a time.
|
||||
let (request_tx, request_rx) = flume::bounded(0);
|
||||
// The Worker --Reply-> Instance channel:
|
||||
let (reply_tx, reply_rx) = flume::bounded(0);
|
||||
|
||||
// This one-shot channel is used for initial startup error handling within the thread.
|
||||
let (result_tx, result_rx) = oneshot::channel();
|
||||
@@ -659,65 +991,63 @@ async fn spawn_instance_worker(
|
||||
|
||||
// Process requests to the worker.
|
||||
//
|
||||
// The loop is terminated when a `JsInstance` is dropped.
|
||||
// The loop is terminated when the last `JsInstance` handle is dropped.
|
||||
// This will cause channels, scopes, and the isolate to be cleaned up.
|
||||
let reply = |ctx: &str, reply: JsWorkerReply, trapped| {
|
||||
if let Err(e) = reply_tx.send((reply, trapped)) {
|
||||
// This should never happen as `JsInstance::$function` immediately
|
||||
// does `.recv` on the other end of the channel, though sometimes
|
||||
// it gets cancelled.
|
||||
log::error!("should have receiver for `{ctx}` response, {e}");
|
||||
}
|
||||
};
|
||||
for request in request_rx.iter() {
|
||||
let mut call_reducer = |tx, params| instance_common.call_reducer_with_tx(tx, params, &mut inst);
|
||||
let mut should_exit = false;
|
||||
|
||||
core_pinner.pin_if_changed();
|
||||
|
||||
use JsWorkerReply::*;
|
||||
match request {
|
||||
JsWorkerRequest::RunFunction(f) => rt.block_on(f()),
|
||||
JsWorkerRequest::UpdateDatabase {
|
||||
reply_tx,
|
||||
program,
|
||||
old_module_info,
|
||||
policy,
|
||||
} => {
|
||||
// Update the database and reply to `JsInstance::update_database`.
|
||||
let res = instance_common.update_database(program, old_module_info, policy, &mut inst);
|
||||
reply("update_database", UpdateDatabase(res), false);
|
||||
send_worker_reply("update_database", reply_tx, res, false);
|
||||
}
|
||||
JsWorkerRequest::CallReducer { params } => {
|
||||
// Call the reducer.
|
||||
// If execution trapped, we don't end the loop here,
|
||||
// but rather let this happen by `return_instance` using `JsInstance::trapped`
|
||||
// which will cause `JsInstance` to be dropped,
|
||||
// which in turn results in the loop being terminated.
|
||||
JsWorkerRequest::CallReducer { reply_tx, params } => {
|
||||
let (res, trapped) = call_reducer(None, params);
|
||||
reply("call_reducer", CallReducer(res), trapped);
|
||||
send_worker_reply("call_reducer", reply_tx, res, trapped);
|
||||
should_exit = trapped;
|
||||
}
|
||||
JsWorkerRequest::CallView { cmd } => {
|
||||
JsWorkerRequest::CallView { reply_tx, cmd } => {
|
||||
let (res, trapped) = instance_common.handle_cmd(cmd, &mut inst);
|
||||
reply("call_view", JsWorkerReply::CallView(res.into()), trapped);
|
||||
send_worker_reply("call_view", reply_tx, res, trapped);
|
||||
should_exit = trapped;
|
||||
}
|
||||
JsWorkerRequest::CallProcedure { params } => {
|
||||
JsWorkerRequest::CallProcedure { reply_tx, params } => {
|
||||
let (res, trapped) = instance_common
|
||||
.call_procedure(params, &mut inst)
|
||||
.now_or_never()
|
||||
.expect("our call_procedure implementation is not actually async");
|
||||
|
||||
reply("call_procedure", JsWorkerReply::CallProcedure(res.into()), trapped);
|
||||
send_worker_reply("call_procedure", reply_tx, res, trapped);
|
||||
should_exit = trapped;
|
||||
}
|
||||
JsWorkerRequest::ClearAllClients => {
|
||||
JsWorkerRequest::ClearAllClients(reply_tx) => {
|
||||
let res = instance_common.clear_all_clients();
|
||||
reply("clear_all_clients", ClearAllClients(res), false);
|
||||
send_worker_reply("clear_all_clients", reply_tx, res, false);
|
||||
}
|
||||
JsWorkerRequest::CallIdentityConnected(caller_auth, caller_connection_id) => {
|
||||
JsWorkerRequest::CallIdentityConnected {
|
||||
reply_tx,
|
||||
caller_auth,
|
||||
caller_connection_id,
|
||||
} => {
|
||||
let mut trapped = false;
|
||||
let res =
|
||||
call_identity_connected(caller_auth, caller_connection_id, info, call_reducer, &mut trapped);
|
||||
reply("call_identity_connected", CallIdentityConnected(res), trapped);
|
||||
send_worker_reply("call_identity_connected", reply_tx, res, trapped);
|
||||
should_exit = trapped;
|
||||
}
|
||||
JsWorkerRequest::CallIdentityDisconnected(caller_identity, caller_connection_id) => {
|
||||
JsWorkerRequest::CallIdentityDisconnected {
|
||||
reply_tx,
|
||||
caller_identity,
|
||||
caller_connection_id,
|
||||
} => {
|
||||
let mut trapped = false;
|
||||
let res = ModuleHost::call_identity_disconnected_inner(
|
||||
caller_identity,
|
||||
@@ -726,26 +1056,38 @@ async fn spawn_instance_worker(
|
||||
call_reducer,
|
||||
&mut trapped,
|
||||
);
|
||||
reply("call_identity_disconnected", CallIdentityDisconnected(res), trapped);
|
||||
send_worker_reply("call_identity_disconnected", reply_tx, res, trapped);
|
||||
should_exit = trapped;
|
||||
}
|
||||
JsWorkerRequest::DisconnectClient(client_id) => {
|
||||
JsWorkerRequest::DisconnectClient { reply_tx, client_id } => {
|
||||
let mut trapped = false;
|
||||
let res = ModuleHost::disconnect_client_inner(client_id, info, call_reducer, &mut trapped);
|
||||
reply("disconnect_client", DisconnectClient(res), trapped);
|
||||
send_worker_reply("disconnect_client", reply_tx, res, trapped);
|
||||
should_exit = trapped;
|
||||
}
|
||||
JsWorkerRequest::InitDatabase(program) => {
|
||||
JsWorkerRequest::InitDatabase { reply_tx, program } => {
|
||||
let (res, trapped): (Result<Option<ReducerCallResult>, anyhow::Error>, bool) =
|
||||
init_database(replica_ctx, &module_common.info().module_def, program, call_reducer);
|
||||
reply("init_database", InitDatabase(Box::new(res)), trapped);
|
||||
send_worker_reply("init_database", reply_tx, res, trapped);
|
||||
should_exit = trapped;
|
||||
}
|
||||
JsWorkerRequest::CallScheduledFunction(params) => {
|
||||
JsWorkerRequest::CallScheduledFunction { reply_tx, params } => {
|
||||
let (res, trapped) = instance_common
|
||||
.call_scheduled_function(params, &mut inst)
|
||||
.now_or_never()
|
||||
.expect("our call_procedure implementation is not actually async");
|
||||
reply("call_scheduled_function", CallScheduledFunction(res), trapped);
|
||||
send_worker_reply("call_scheduled_function", reply_tx, res, trapped);
|
||||
should_exit = trapped;
|
||||
}
|
||||
}
|
||||
|
||||
// Once a JS instance traps, we must not let later queued work execute
|
||||
// on that poisoned isolate. We reply to the trapping request first so
|
||||
// the caller can observe the actual reducer/procedure error, and then
|
||||
// shut the worker down so later callers retry on a fresh instance.
|
||||
if should_exit {
|
||||
break;
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
@@ -753,9 +1095,9 @@ async fn spawn_instance_worker(
|
||||
let res: Result<ModuleCommon, anyhow::Error> = result_rx.await.expect("should have a sender");
|
||||
res.map(|opt_mc| {
|
||||
let inst = JsInstance {
|
||||
id: NEXT_JS_INSTANCE_ID.fetch_add(1, Ordering::Relaxed),
|
||||
request_tx,
|
||||
reply_rx,
|
||||
trapped: false,
|
||||
trapped: Arc::new(AtomicBool::new(false)),
|
||||
};
|
||||
(opt_mc, inst)
|
||||
})
|
||||
|
||||
Reference in New Issue
Block a user