Allow procedures to yield outside of with_tx()

This commit is contained in:
Julien Lavocat
2026-05-05 15:03:14 +02:00
parent 06fa5e311c
commit cda1de551f
2 changed files with 28 additions and 2 deletions
@@ -133,6 +133,9 @@ pub(super) struct WasmInstanceEnv {
/// A pool of unused allocated chunks that can be reused.
// TODO(Centril): consider using this pool for `console_timer_start` and `bytes_sink_write`.
chunk_pool: ChunkPool,
/// Whether the current epoch can yield, used to prevent yielding in reducers.
epoch_can_yield: bool,
}
const STANDARD_BYTES_SINK: u32 = 1;
@@ -158,6 +161,7 @@ impl WasmInstanceEnv {
timing_spans: Default::default(),
call_times: CallTimes::new(),
chunk_pool: <_>::default(),
epoch_can_yield: false,
}
}
@@ -244,6 +248,16 @@ impl WasmInstanceEnv {
self.standard_bytes_sink.take().unwrap_or_default()
}
/// Return whether the current epoch can yield. This should be `true` for procedures and `false` for reducers.
pub fn epoch_can_yield(&self) -> bool {
self.epoch_can_yield
}
/// Set whether the current epoch can yield. This should be `true` for procedures and `false` for reducers.
pub fn set_epoch_can_yield(&mut self, can_yield: bool) {
self.epoch_can_yield = can_yield;
}
/// Signal to this `WasmInstanceEnv` that a reducer or procedure call is beginning.
///
/// Returns the handle used by reducers and procedures to read from `args`
@@ -220,13 +220,23 @@ impl module_host_actor::WasmInstancePre for WasmtimeModule {
store.epoch_deadline_callback(|store| {
let env = store.data();
let database = env.instance_env().replica_ctx.database_identity;
let instance_env = env.instance_env();
let database = instance_env.replica_ctx.database_identity;
let funcall = env.log_record_function().unwrap_or_default();
let dur = env.funcall_start().elapsed();
// TODO(procedure-timing): This measurement is not super meaningful for procedures,
// which may (will) suspend execution and therefore may not have been continuously running since `env.funcall_start`.
tracing::warn!(funcall, ?database, "Wasm has been running for {dur:?}");
Ok(wasmtime::UpdateDeadline::Continue(EPOCH_TICKS_PER_SECOND))
// Only yield if we're currently executing procedure code outside of `with_tx()`
// reducers and views should never yield
let should_yield = env.epoch_can_yield() && !instance_env.in_tx();
if should_yield {
Ok(wasmtime::UpdateDeadline::Yield(EPOCH_TICKS_PER_SECOND))
} else {
Ok(wasmtime::UpdateDeadline::Continue(EPOCH_TICKS_PER_SECOND))
}
});
// Note: this budget is just for initializers
@@ -593,6 +603,7 @@ impl module_host_actor::WasmInstance for WasmtimeInstance {
};
return (res, None);
};
store.data_mut().set_epoch_can_yield(true);
let call_result = call_procedure
.call_async(
&mut *store,
@@ -610,6 +621,7 @@ impl module_host_actor::WasmInstance for WasmtimeInstance {
),
)
.await;
store.data_mut().set_epoch_can_yield(false);
// Close any ongoing anonymous transactions by aborting them,
// in case of improper ABI use (start but no commit/abort).