From cda1de551f7942d5858a078b97cbd3fbb141b584 Mon Sep 17 00:00:00 2001 From: Julien Lavocat Date: Tue, 5 May 2026 15:03:14 +0200 Subject: [PATCH] Allow procedures to yield outside of `with_tx()` --- .../core/src/host/wasmtime/wasm_instance_env.rs | 14 ++++++++++++++ crates/core/src/host/wasmtime/wasmtime_module.rs | 16 ++++++++++++++-- 2 files changed, 28 insertions(+), 2 deletions(-) diff --git a/crates/core/src/host/wasmtime/wasm_instance_env.rs b/crates/core/src/host/wasmtime/wasm_instance_env.rs index 7c0b2a088..592b2a52e 100644 --- a/crates/core/src/host/wasmtime/wasm_instance_env.rs +++ b/crates/core/src/host/wasmtime/wasm_instance_env.rs @@ -133,6 +133,9 @@ pub(super) struct WasmInstanceEnv { /// A pool of unused allocated chunks that can be reused. // TODO(Centril): consider using this pool for `console_timer_start` and `bytes_sink_write`. chunk_pool: ChunkPool, + + /// Whether the current epoch can yield, used to prevent yielding in reducers. + epoch_can_yield: bool, } const STANDARD_BYTES_SINK: u32 = 1; @@ -158,6 +161,7 @@ impl WasmInstanceEnv { timing_spans: Default::default(), call_times: CallTimes::new(), chunk_pool: <_>::default(), + epoch_can_yield: false, } } @@ -244,6 +248,16 @@ impl WasmInstanceEnv { self.standard_bytes_sink.take().unwrap_or_default() } + /// Return whether the current epoch can yield. This should be `true` for procedures and `false` for reducers. + pub fn epoch_can_yield(&self) -> bool { + self.epoch_can_yield + } + + /// Set whether the current epoch can yield. This should be `true` for procedures and `false` for reducers. + pub fn set_epoch_can_yield(&mut self, can_yield: bool) { + self.epoch_can_yield = can_yield; + } + /// Signal to this `WasmInstanceEnv` that a reducer or procedure call is beginning. /// /// Returns the handle used by reducers and procedures to read from `args` diff --git a/crates/core/src/host/wasmtime/wasmtime_module.rs b/crates/core/src/host/wasmtime/wasmtime_module.rs index 0690120eb..e415a2f3b 100644 --- a/crates/core/src/host/wasmtime/wasmtime_module.rs +++ b/crates/core/src/host/wasmtime/wasmtime_module.rs @@ -220,13 +220,23 @@ impl module_host_actor::WasmInstancePre for WasmtimeModule { store.epoch_deadline_callback(|store| { let env = store.data(); - let database = env.instance_env().replica_ctx.database_identity; + let instance_env = env.instance_env(); + let database = instance_env.replica_ctx.database_identity; let funcall = env.log_record_function().unwrap_or_default(); let dur = env.funcall_start().elapsed(); // TODO(procedure-timing): This measurement is not super meaningful for procedures, // which may (will) suspend execution and therefore may not have been continuously running since `env.funcall_start`. tracing::warn!(funcall, ?database, "Wasm has been running for {dur:?}"); - Ok(wasmtime::UpdateDeadline::Continue(EPOCH_TICKS_PER_SECOND)) + + // Only yield if we're currently executing procedure code outside of `with_tx()` + // reducers and views should never yield + let should_yield = env.epoch_can_yield() && !instance_env.in_tx(); + + if should_yield { + Ok(wasmtime::UpdateDeadline::Yield(EPOCH_TICKS_PER_SECOND)) + } else { + Ok(wasmtime::UpdateDeadline::Continue(EPOCH_TICKS_PER_SECOND)) + } }); // Note: this budget is just for initializers @@ -593,6 +603,7 @@ impl module_host_actor::WasmInstance for WasmtimeInstance { }; return (res, None); }; + store.data_mut().set_epoch_can_yield(true); let call_result = call_procedure .call_async( &mut *store, @@ -610,6 +621,7 @@ impl module_host_actor::WasmInstance for WasmtimeInstance { ), ) .await; + store.data_mut().set_epoch_can_yield(false); // Close any ongoing anonymous transactions by aborting them, // in case of improper ABI use (start but no commit/abort).