slim down to datastore focused

This commit is contained in:
Shubham Mishra
2026-05-05 13:01:28 +05:30
parent 831e9f2db9
commit 7282b9b1d7
27 changed files with 608 additions and 811 deletions
Generated
+4 -12
View File
@@ -8171,7 +8171,6 @@ dependencies = [
"itertools 0.12.1",
"lazy_static",
"log",
"madsim-tokio",
"memchr",
"nix 0.30.1",
"nohash-hasher",
@@ -8235,6 +8234,7 @@ dependencies = [
"thiserror 1.0.69",
"tikv-jemalloc-ctl",
"tikv-jemallocator",
"tokio",
"tokio-metrics",
"tokio-stream",
"tokio-util",
@@ -8310,25 +8310,17 @@ name = "spacetimedb-dst"
version = "2.2.0"
dependencies = [
"anyhow",
"bytes",
"async-task",
"clap 4.5.50",
"futures-util",
"madsim",
"madsim-tokio",
"spacetimedb-cli",
"spacetimedb-client-api",
"spacetimedb-client-api-messages",
"spacetimedb-commitlog",
"spacetimedb-core",
"spacetimedb-datastore",
"spacetimedb-durability",
"spacetimedb-execution",
"spacetimedb-lib 2.2.0",
"spacetimedb-paths",
"spacetimedb-primitives 2.2.0",
"spacetimedb-sats 2.2.0",
"spacetimedb-schema",
"spacetimedb-standalone",
"spacetimedb-table",
"tracing",
"tracing-subscriber",
@@ -8343,7 +8335,6 @@ dependencies = [
"futures",
"itertools 0.12.1",
"log",
"madsim-tokio",
"scopeguard",
"spacetimedb-commitlog",
"spacetimedb-fs-utils",
@@ -8351,6 +8342,7 @@ dependencies = [
"spacetimedb-sats 2.2.0",
"tempfile",
"thiserror 1.0.69",
"tokio",
"tracing",
]
@@ -8418,7 +8410,7 @@ dependencies = [
name = "spacetimedb-io"
version = "2.2.0"
dependencies = [
"madsim-tokio",
"tokio",
]
[[package]]
+1 -1
View File
@@ -106,7 +106,7 @@ tempfile.workspace = true
thiserror.workspace = true
thin-vec.workspace = true
tokio-util.workspace = true
tokio = { package = "madsim-tokio", path = "../../../../madsim/madsim-tokio", features = ["full"] }
tokio.workspace = true
tokio-stream = { workspace = true, features = ["sync"] }
tokio-metrics = { version = "0.4.0", features = ["rt"] }
toml.workspace = true
+1
View File
@@ -18,6 +18,7 @@ pub mod estimation;
pub mod host;
pub mod module_host_context;
pub mod replica_context;
pub mod runtime;
pub mod startup;
pub mod subscription;
pub mod util;
+13
View File
@@ -0,0 +1,13 @@
//! Opaque runtime boundary for crates that should not depend on Tokio directly.
pub type Handle = tokio::runtime::Handle;
pub type Runtime = tokio::runtime::Runtime;
pub fn current_handle_or_new_runtime() -> anyhow::Result<(Handle, Option<Runtime>)> {
if let Ok(handle) = Handle::try_current() {
return Ok((handle, None));
}
let runtime = Runtime::new()?;
Ok((runtime.handle().clone(), Some(runtime)))
}
+1 -9
View File
@@ -16,25 +16,17 @@ bench = false
[dependencies]
anyhow.workspace = true
async-task = "4.4"
clap.workspace = true
futures-util.workspace = true
tokio = { package = "madsim-tokio", path = "../../../../madsim/madsim-tokio", features = ["full"] }
bytes.workspace = true
spacetimedb-cli.workspace = true
spacetimedb-client-api.workspace = true
spacetimedb-client-api-messages.workspace = true
spacetimedb-datastore = { workspace = true, features = ["test"] }
spacetimedb_core = { package = "spacetimedb-core", path = "../core", version = "=2.2.0" }
spacetimedb-commitlog = { workspace = true, features = ["test"] }
spacetimedb_durability = { package = "spacetimedb-durability", path = "../durability", version = "=2.2.0" }
spacetimedb-execution.workspace = true
spacetimedb-lib.workspace = true
spacetimedb-paths.workspace = true
spacetimedb-primitives.workspace = true
spacetimedb-sats.workspace = true
spacetimedb-schema = { workspace = true, features = ["test"] }
spacetimedb-standalone.workspace = true
spacetimedb-table.workspace = true
tracing.workspace = true
tracing-subscriber.workspace = true
madsim = { path = "../../../../madsim/madsim" }
+9 -25
View File
@@ -53,10 +53,6 @@ replication traffic. Targets translate those IDs into their own handles:
- `relational-db-commitlog` maps `SessionId` to direct write/read transaction
slots.
- `standalone-host` currently maps `SessionId::ZERO` to its host
`ClientConnection`; reducer interactions already carry the logical session so
multi-session host workloads can be added without changing the interaction
shape again.
- future replication targets can map `SessionId` plus endpoint/node IDs to a
client connection routed through the simulated network.
@@ -75,8 +71,7 @@ DST workloads use three building blocks:
`table_ops` is the base table-transaction workload. `commitlog_ops` composes it
and injects durability lifecycle operations such as sync, close/reopen, dynamic
table create/migrate/drop, and replay checks. `module_ops` drives standalone
host/module interactions.
table create/migrate/drop, and replay checks.
Use this rule of thumb:
@@ -116,10 +111,6 @@ or properties to trust generator-provided expectations.
- `relational-db-commitlog`: runs table and commitlog lifecycle interactions
against `RelationalDB`, local durability, dynamic schema operations,
close/reopen, and replay-from-history checks.
- `standalone-host`: runs generated module interactions against a standalone
host environment.
Both targets reuse shared workload families and the same streaming runner.
## Properties
@@ -144,7 +135,7 @@ Current property families include:
## Fault Injection
`relational-db-commitlog` can wrap the in-memory commitlog repo in
`BuggifiedRepo`. Fault decisions are deterministic in simulation runs and
`BuggifiedRepo`. Fault decisions are deterministic from the run seed and
summarized in the final outcome.
Profiles:
@@ -169,25 +160,18 @@ Scenario examples:
```bash
cargo run -p spacetimedb-dst -- run --target relational-db-commitlog --scenario banking --duration 5m
cargo run -p spacetimedb-dst -- run --target relational-db-commitlog --scenario indexed-ranges --duration 5m
cargo run -p spacetimedb-dst -- run --target standalone-host --scenario host-smoke --max-interactions 100
```
madsim-backed simulation run with commitlog faults:
Run with commitlog faults:
```bash
RUSTFLAGS='--cfg madsim' cargo run -p spacetimedb-dst -- run \
cargo run -p spacetimedb-dst -- run \
--target relational-db-commitlog \
--seed 42 \
--max-interactions 400 \
--commitlog-fault-profile default
```
`--cfg madsim` is still the switch that enables madsim-tokio. Do not pass
`--cfg simulation` directly: that only enables SpacetimeDB's cfg gates and leaves
the madsim dependency in its normal Tokio/std mode. The workspace crates derive
`cfg(simulation)` from `cfg(madsim)` so SpacetimeDB source code does not need
provider-specific cfg gates.
Trace every interaction:
```bash
@@ -212,6 +196,7 @@ Start here:
- `src/workload/table_ops`: table interaction language, generation model, and
scenarios.
- `src/workload/commitlog_ops`: lifecycle layer over table workloads.
- `src/sim/`: local executor and deterministic-decision shim.
- `src/properties.rs`: property catalog and oracle/model checks.
- `src/targets/relational_db_commitlog.rs`: target adapter for RelationalDB,
commitlog durability, fault injection, close/reopen, and replay.
@@ -232,12 +217,11 @@ Start here:
- No shrinker yet; seed replay is the current reproduction mechanism.
- Sometimes-property reporting is still outcome-counter based, not a stable
property-event catalog.
- madsim backs the current deterministic runtime/fault hooks; deeper
host/network/filesystem simulation still needs explicit runtime and IO
boundaries.
- The local `sim` shim is not a real simulator yet. It owns executor setup and
deterministic fault decisions so future simulator work has one boundary.
- The current `RelationalDB` target drives open read snapshots to release before
starting writes, because beginning a write behind an open read snapshot can
block in this target shape. Interleaved read/write snapshot histories should
come back once the target models that lock behavior explicitly.
- Current simulation builds still expose runtime-boundary gaps, including
`spawn_blocking` call sites and randomized standard `HashMap` state warnings.
- Runtime-boundary work for scheduler, time, network, filesystem, and lower
randomness sources is still future work.
-10
View File
@@ -1,10 +0,0 @@
fn main() {
println!("cargo:rerun-if-env-changed=CARGO_CFG_MADSIM");
println!("cargo:rerun-if-env-changed=CARGO_CFG_SIMULATION");
println!("cargo:rerun-if-env-changed=CARGO_ENCODED_RUSTFLAGS");
println!("cargo:rerun-if-env-changed=RUSTFLAGS");
if std::env::var_os("CARGO_CFG_MADSIM").is_some() {
println!("cargo:rustc-cfg=simulation");
}
}
+20 -19
View File
@@ -233,31 +233,32 @@ mod tests {
}
}
#[tokio::test]
async fn not_crash_catches_execute_panic() {
assert_not_crash_error(PanicPhase::Execute, "execute_interaction", "execute panic").await;
#[test]
fn not_crash_catches_execute_panic() {
assert_not_crash_error(PanicPhase::Execute, "execute_interaction", "execute panic");
}
#[tokio::test]
async fn not_crash_catches_finish_panic() {
assert_not_crash_error(PanicPhase::Finish, "finish", "finish panic").await;
#[test]
fn not_crash_catches_finish_panic() {
assert_not_crash_error(PanicPhase::Finish, "finish", "finish panic");
}
#[tokio::test]
async fn not_crash_catches_collect_outcome_panic() {
assert_not_crash_error(PanicPhase::CollectOutcome, "collect_outcome", "collect panic").await;
#[test]
fn not_crash_catches_collect_outcome_panic() {
assert_not_crash_error(PanicPhase::CollectOutcome, "collect_outcome", "collect panic");
}
async fn assert_not_crash_error(phase: PanicPhase, expected_phase: &str, expected_payload: &str) {
let err = run_streaming(
SingleStepSource::new(),
PanicEngine::new(phase),
NoopProperties,
RunConfig::with_max_interactions(1),
)
.await
.unwrap_err()
.to_string();
fn assert_not_crash_error(phase: PanicPhase, expected_phase: &str, expected_payload: &str) {
let mut runtime = crate::sim::Runtime::new(crate::seed::DstSeed(0)).expect("runtime");
let err = runtime
.block_on(run_streaming(
SingleStepSource::new(),
PanicEngine::new(phase),
NoopProperties,
RunConfig::with_max_interactions(1),
))
.unwrap_err()
.to_string();
assert!(err.contains("[NotCrash]"));
assert!(err.contains(expected_phase));
+3 -8
View File
@@ -7,7 +7,7 @@
//! - [`properties`] for reusable semantic checks,
//! - [`seed`] for deterministic seeds,
//! - [`workload`] for scenario identifiers,
//! - [`targets`] for executable relational-db / standalone-host adapters.
//! - [`targets`] for the executable relational-db + commitlog adapter.
//!
//! ## DST principles
//!
@@ -32,13 +32,6 @@
//! 7. Shared randomness, weighting, and sampling helpers belong in the
//! workload strategy module, not in ad hoc target or scenario code.
#[cfg(all(simulation, not(madsim)))]
compile_error!(
"cfg(simulation) enables SpacetimeDB simulation gates, but madsim itself \
still requires cfg(madsim). Use RUSTFLAGS=\"--cfg madsim\" or ./run_dst.sh; \
SpacetimeDB crates derive cfg(simulation) from cfg(madsim)."
);
/// Logical client/session identifiers shared by workloads and targets.
pub mod client;
/// Shared run-budget configuration for DST targets.
@@ -50,6 +43,8 @@ pub(crate) mod properties;
mod schema;
/// Stable seed and RNG utilities used to make runs reproducible.
pub mod seed;
/// Local executor and deterministic-decision shim.
pub mod sim;
/// Concrete simulator targets.
pub mod targets;
/// Shared workload generators reused by multiple targets.
+8 -38
View File
@@ -1,14 +1,11 @@
use std::{
future::Future,
time::{SystemTime, UNIX_EPOCH},
};
use std::time::{SystemTime, UNIX_EPOCH};
use clap::{Args, Parser, Subcommand, ValueEnum};
use spacetimedb_dst::{
config::{CommitlogFaultProfile, RunConfig},
seed::DstSeed,
targets::descriptor::{RelationalDbCommitlogDescriptor, StandaloneHostDescriptor, TargetDescriptor},
workload::{module_ops::HostScenarioId, table_ops::TableScenarioId},
targets::descriptor::{RelationalDbCommitlogDescriptor, TargetDescriptor},
workload::table_ops::TableScenarioId,
};
#[derive(Parser, Debug)]
@@ -57,7 +54,6 @@ struct RunArgs {
#[derive(Copy, Clone, Debug, Eq, PartialEq, ValueEnum)]
enum TargetKind {
RelationalDbCommitlog,
StandaloneHost,
}
#[derive(Copy, Clone, Debug, Eq, PartialEq, ValueEnum)]
@@ -65,7 +61,6 @@ enum ScenarioKind {
RandomCrud,
IndexedRanges,
Banking,
HostSmoke,
}
#[derive(Copy, Clone, Debug, Eq, PartialEq, ValueEnum)]
@@ -120,10 +115,6 @@ fn run_command(args: RunArgs) -> anyhow::Result<()> {
let scenario = map_table_scenario(args.target.scenario)?;
run_prepared_target::<RelationalDbCommitlogDescriptor>(seed, scenario, config)
}
TargetKind::StandaloneHost => {
let scenario = map_host_scenario(args.target.scenario)?;
run_prepared_target::<StandaloneHostDescriptor>(seed, scenario, config)
}
}
}
@@ -133,25 +124,12 @@ fn run_prepared_target<D: TargetDescriptor>(
config: RunConfig,
) -> anyhow::Result<()> {
D::prepare(seed, &scenario, &config)?;
run_in_runtime(seed, run_target::<D>(seed, scenario, config))
}
#[cfg(all(simulation, madsim))]
fn run_in_runtime<F, T>(seed: DstSeed, future: F) -> anyhow::Result<T>
where
F: Future<Output = anyhow::Result<T>>,
{
let mut runtime = madsim::runtime::Runtime::with_seed_and_config(seed.0, madsim::Config::default());
let mut runtime = spacetimedb_dst::sim::Runtime::new(seed)?;
// RelationalDB durability still runs on core's production runtime boundary.
// Let those external tasks wake the DST executor while this target is being
// migrated toward a fully local simulator.
runtime.set_allow_system_thread(true);
runtime.block_on(future)
}
#[cfg(not(all(simulation, madsim)))]
fn run_in_runtime<F, T>(_seed: DstSeed, future: F) -> anyhow::Result<T>
where
F: Future<Output = anyhow::Result<T>>,
{
tokio::runtime::Runtime::new()?.block_on(future)
runtime.block_on(run_target::<D>(seed, scenario, config))
}
fn map_table_scenario(scenario: ScenarioKind) -> anyhow::Result<TableScenarioId> {
@@ -159,14 +137,6 @@ fn map_table_scenario(scenario: ScenarioKind) -> anyhow::Result<TableScenarioId>
ScenarioKind::RandomCrud => Ok(TableScenarioId::RandomCrud),
ScenarioKind::IndexedRanges => Ok(TableScenarioId::IndexedRanges),
ScenarioKind::Banking => Ok(TableScenarioId::Banking),
ScenarioKind::HostSmoke => anyhow::bail!("scenario host-smoke is only valid for --target standalone-host"),
}
}
fn map_host_scenario(scenario: ScenarioKind) -> anyhow::Result<HostScenarioId> {
match scenario {
ScenarioKind::HostSmoke => Ok(HostScenarioId::HostSmoke),
_ => anyhow::bail!("target standalone-host only supports --scenario host-smoke"),
}
}
+414
View File
@@ -0,0 +1,414 @@
//! Minimal asynchronous executor adapted from madsim's `sim/task` loop.
use std::{
collections::BTreeMap,
fmt,
future::Future,
panic::AssertUnwindSafe,
pin::Pin,
sync::{
atomic::{AtomicBool, Ordering},
Arc, Mutex,
},
task::{Context, Poll},
thread::{self, Thread},
time::Duration,
};
use futures_util::FutureExt;
use crate::{seed::DstSeed, sim::Rng};
type Runnable = async_task::Runnable<NodeId>;
/// A unique identifier for a simulated node.
#[derive(Clone, Copy, Debug, Eq, Hash, Ord, PartialEq, PartialOrd)]
pub struct NodeId(u64);
impl NodeId {
pub const MAIN: Self = Self(0);
}
impl fmt::Display for NodeId {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
self.0.fmt(f)
}
}
/// A small single-threaded runtime for DST's top-level future.
///
/// futures are scheduled as runnables, the ready queue
/// is sampled by deterministic RNG, and pending execution without future events
/// is considered a test hang unless external system threads are explicitly
/// allowed for the current target.
pub struct Runtime {
executor: Arc<Executor>,
}
impl Runtime {
pub fn new(seed: DstSeed) -> anyhow::Result<Self> {
Ok(Self {
executor: Arc::new(Executor::new(seed)),
})
}
pub fn block_on<F: Future>(&mut self, future: F) -> F::Output {
self.executor.block_on(future)
}
/// Allow parking briefly for non-DST runtime threads to wake the root task.
///
/// This is currently needed by the relational target while durability still
/// uses core's production runtime boundary.
pub fn set_allow_system_thread(&mut self, allowed: bool) {
self.executor.set_allow_system_thread(allowed);
}
pub fn handle(&self) -> Handle {
Handle {
executor: Arc::clone(&self.executor),
}
}
pub fn create_node(&self) -> NodeId {
self.handle().create_node()
}
pub fn pause(&self, node: NodeId) {
self.handle().pause(node);
}
pub fn resume(&self, node: NodeId) {
self.handle().resume(node);
}
pub fn spawn_on<F>(&self, node: NodeId, future: F) -> JoinHandle<F::Output>
where
F: Future + Send + 'static,
F::Output: Send + 'static,
{
self.handle().spawn_on(node, future)
}
}
/// Cloneable access to the simulation executor.
#[derive(Clone)]
pub struct Handle {
executor: Arc<Executor>,
}
impl Handle {
pub fn create_node(&self) -> NodeId {
self.executor.create_node()
}
pub fn pause(&self, node: NodeId) {
self.executor.pause(node);
}
pub fn resume(&self, node: NodeId) {
self.executor.resume(node);
}
pub fn spawn_on<F>(&self, node: NodeId, future: F) -> JoinHandle<F::Output>
where
F: Future + Send + 'static,
F::Output: Send + 'static,
{
self.executor.spawn_on(node, future)
}
}
/// A spawned simulated task.
pub struct JoinHandle<T> {
task: async_task::Task<T, NodeId>,
}
impl<T> JoinHandle<T> {
pub fn detach(self) {
self.task.detach();
}
}
impl<T> Future for JoinHandle<T> {
type Output = T;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
Pin::new(&mut self.task).poll(cx)
}
}
struct Executor {
queue: Receiver,
sender: Sender,
nodes: Mutex<BTreeMap<NodeId, Arc<NodeState>>>,
next_node: std::sync::atomic::AtomicU64,
rng: Mutex<Rng>,
allow_system_thread: AtomicBool,
}
impl Executor {
fn new(seed: DstSeed) -> Self {
let queue = Queue::new();
let mut nodes = BTreeMap::new();
nodes.insert(NodeId::MAIN, Arc::new(NodeState::default()));
Self {
queue: queue.receiver(),
sender: queue.sender(),
nodes: Mutex::new(nodes),
next_node: std::sync::atomic::AtomicU64::new(1),
rng: Mutex::new(Rng::new(seed)),
allow_system_thread: AtomicBool::new(false),
}
}
fn set_allow_system_thread(&self, allowed: bool) {
self.allow_system_thread.store(allowed, Ordering::Relaxed);
}
fn create_node(&self) -> NodeId {
let id = NodeId(self.next_node.fetch_add(1, Ordering::Relaxed));
self.nodes
.lock()
.expect("nodes poisoned")
.insert(id, Arc::new(NodeState::default()));
id
}
fn pause(&self, node: NodeId) {
self.node_state(node).paused.store(true, Ordering::Relaxed);
}
fn resume(&self, node: NodeId) {
let state = self.node_state(node);
state.paused.store(false, Ordering::Relaxed);
let mut paused = state.paused_queue.lock().expect("paused queue poisoned");
for runnable in paused.drain(..) {
self.sender.send(runnable);
}
}
fn spawn_on<F>(&self, node: NodeId, future: F) -> JoinHandle<F::Output>
where
F: Future + Send + 'static,
F::Output: Send + 'static,
{
self.node_state(node);
let sender = self.sender.clone();
let (runnable, task) = async_task::Builder::new()
.metadata(node)
.spawn(move |_| future, move |runnable| sender.send(runnable));
runnable.schedule();
JoinHandle { task }
}
#[track_caller]
fn block_on<F: Future>(&self, future: F) -> F::Output {
let _waiter = WaiterGuard::new(&self.queue, thread::current());
let sender = self.sender.clone();
let (runnable, task) = unsafe {
async_task::Builder::new()
.metadata(NodeId::MAIN)
.spawn_unchecked(move |_| future, move |runnable| sender.send(runnable))
};
runnable.schedule();
loop {
self.run_all_ready();
if task.is_finished() {
return task.now_or_never().expect("finished task should resolve");
}
if self.allow_system_thread.load(Ordering::Relaxed) {
thread::park_timeout(Duration::from_millis(1));
} else {
panic!("no runnable tasks; all simulated tasks are blocked");
}
}
}
fn run_all_ready(&self) {
while let Some(runnable) = self.queue.try_recv_random(&self.rng) {
let node = *runnable.metadata();
let state = self.node_state(node);
if state.paused.load(Ordering::Relaxed) {
state.paused_queue.lock().expect("paused queue poisoned").push(runnable);
continue;
}
let result = std::panic::catch_unwind(AssertUnwindSafe(|| runnable.run()));
if let Err(payload) = result {
std::panic::resume_unwind(payload);
}
}
}
fn node_state(&self, node: NodeId) -> Arc<NodeState> {
self.nodes
.lock()
.expect("nodes poisoned")
.get(&node)
.cloned()
.unwrap_or_else(|| panic!("unknown simulated node {node}"))
}
}
#[derive(Clone, Default)]
struct NodeState {
paused: Arc<AtomicBool>,
paused_queue: Arc<Mutex<Vec<Runnable>>>,
}
pub async fn yield_now() {
YieldNow { yielded: false }.await
}
struct YieldNow {
yielded: bool,
}
impl Future for YieldNow {
type Output = ();
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
if self.yielded {
Poll::Ready(())
} else {
self.yielded = true;
cx.waker().wake_by_ref();
Poll::Pending
}
}
}
struct WaiterGuard<'a> {
receiver: &'a Receiver,
}
impl<'a> WaiterGuard<'a> {
fn new(receiver: &'a Receiver, thread: Thread) -> Self {
receiver.set_waiter(Some(thread));
Self { receiver }
}
}
impl Drop for WaiterGuard<'_> {
fn drop(&mut self) {
self.receiver.set_waiter(None);
}
}
struct Queue {
inner: Arc<QueueInner>,
}
#[derive(Clone)]
struct Sender {
inner: Arc<QueueInner>,
}
#[derive(Clone)]
struct Receiver {
inner: Arc<QueueInner>,
}
struct QueueInner {
queue: Mutex<Vec<Runnable>>,
waiter: Mutex<Option<Thread>>,
}
impl Queue {
fn new() -> Self {
Self {
inner: Arc::new(QueueInner {
queue: Mutex::new(Vec::new()),
waiter: Mutex::new(None),
}),
}
}
fn sender(&self) -> Sender {
Sender {
inner: self.inner.clone(),
}
}
fn receiver(&self) -> Receiver {
Receiver {
inner: self.inner.clone(),
}
}
}
impl Sender {
fn send(&self, runnable: Runnable) {
self.inner.queue.lock().expect("run queue poisoned").push(runnable);
if let Some(thread) = self.inner.waiter.lock().expect("waiter poisoned").as_ref() {
thread.unpark();
}
}
}
impl Receiver {
fn set_waiter(&self, thread: Option<Thread>) {
*self.inner.waiter.lock().expect("waiter poisoned") = thread;
}
fn try_recv_random(&self, rng: &Mutex<Rng>) -> Option<Runnable> {
let mut queue = self.inner.queue.lock().expect("run queue poisoned");
if queue.is_empty() {
return None;
}
let idx = rng.lock().expect("rng poisoned").index(queue.len());
Some(queue.swap_remove(idx))
}
}
#[cfg(test)]
mod tests {
use std::sync::{
atomic::{AtomicUsize, Ordering},
Arc,
};
use super::*;
#[test]
fn paused_node_does_not_run_until_resumed() {
let mut runtime = Runtime::new(DstSeed(1)).unwrap();
let node = runtime.create_node();
runtime.pause(node);
let runs = Arc::new(AtomicUsize::new(0));
let task_runs = Arc::clone(&runs);
let task = runtime.spawn_on(node, async move {
task_runs.fetch_add(1, Ordering::SeqCst);
7
});
runtime.block_on(async {
yield_now().await;
});
assert_eq!(runs.load(Ordering::SeqCst), 0);
runtime.resume(node);
assert_eq!(runtime.block_on(task), 7);
assert_eq!(runs.load(Ordering::SeqCst), 1);
}
#[test]
fn handle_can_spawn_onto_node_from_simulated_task() {
let mut runtime = Runtime::new(DstSeed(2)).unwrap();
let handle = runtime.handle();
let value = runtime.block_on(async move {
let node = handle.create_node();
handle.spawn_on(node, async { 11 }).await
});
assert_eq!(value, 11);
}
}
+33
View File
@@ -0,0 +1,33 @@
//! Local simulation shim for the DST crate.
//!
//! This module is deliberately small, but its executor shape follows madsim's:
//! futures are scheduled as runnable tasks and the ready queue is sampled by a
//! deterministic RNG instead of being driven by a package-level async runtime.
mod executor;
mod rng;
use std::time::Duration;
pub use executor::{yield_now, Handle, JoinHandle, NodeId, Runtime};
pub use rng::Rng;
use crate::seed::DstSeed;
pub(crate) use rng::DecisionSource;
pub(crate) type RuntimeHandle = spacetimedb_core::runtime::Handle;
pub(crate) type RuntimeGuard = spacetimedb_core::runtime::Runtime;
pub(crate) fn current_handle_or_new_runtime() -> anyhow::Result<(RuntimeHandle, Option<RuntimeGuard>)> {
spacetimedb_core::runtime::current_handle_or_new_runtime()
}
pub(crate) fn advance_time(_duration: Duration) {
// This is a hook, not wall-clock sleep. A future simulator layer can advance
// virtual time here while keeping targets on the same API.
}
pub(crate) fn decision_source(seed: DstSeed) -> DecisionSource {
DecisionSource::new(seed)
}
+74
View File
@@ -0,0 +1,74 @@
use std::sync::atomic::{AtomicU64, Ordering};
use crate::seed::DstSeed;
const GAMMA: u64 = 0x9e37_79b9_7f4a_7c15;
#[derive(Clone, Debug)]
pub struct Rng {
state: u64,
}
impl Rng {
pub fn new(seed: DstSeed) -> Self {
Self {
state: splitmix64(seed.0),
}
}
pub fn next_u64(&mut self) -> u64 {
self.state = self.state.wrapping_add(GAMMA);
splitmix64(self.state)
}
pub fn index(&mut self, len: usize) -> usize {
assert!(len > 0, "len must be non-zero");
(self.next_u64() as usize) % len
}
pub fn sample_probability(&mut self, probability: f64) -> bool {
probability_sample(self.next_u64(), probability)
}
}
#[derive(Debug)]
pub(crate) struct DecisionSource {
state: AtomicU64,
}
impl DecisionSource {
pub(crate) fn new(seed: DstSeed) -> Self {
Self {
state: AtomicU64::new(splitmix64(seed.0)),
}
}
pub(crate) fn sample_probability(&self, probability: f64) -> bool {
probability_sample(self.next_u64(), probability)
}
fn next_u64(&self) -> u64 {
let state = self.state.fetch_add(GAMMA, Ordering::Relaxed);
splitmix64(state)
}
}
fn probability_sample(value: u64, probability: f64) -> bool {
if probability <= 0.0 {
return false;
}
if probability >= 1.0 {
return true;
}
// Use the top 53 bits to build an exactly representable f64 in [0, 1).
let unit = (value >> 11) as f64 * (1.0 / ((1u64 << 53) as f64));
unit < probability
}
fn splitmix64(mut x: u64) -> u64 {
x = x.wrapping_add(GAMMA);
x = (x ^ (x >> 30)).wrapping_mul(0xbf58_476d_1ce4_e5b9);
x = (x ^ (x >> 27)).wrapping_mul(0x94d0_49bb_1331_11eb);
x ^ (x >> 31)
}
+8 -17
View File
@@ -13,7 +13,7 @@ use spacetimedb_commitlog::{
segment::FileLike,
};
use crate::{config::CommitlogFaultProfile, workload::commitlog_ops::DiskFaultSummary};
use crate::{config::CommitlogFaultProfile, seed::DstSeed, sim, workload::commitlog_ops::DiskFaultSummary};
const INJECTED_DISK_ERROR_PREFIX: &str = "dst injected disk ";
@@ -123,10 +123,10 @@ pub(crate) struct BuggifiedRepo<R> {
}
impl<R> BuggifiedRepo<R> {
pub(crate) fn new(inner: R, config: CommitlogFaultConfig) -> Self {
pub(crate) fn new(inner: R, config: CommitlogFaultConfig, seed: DstSeed) -> Self {
Self {
inner,
faults: FaultController::new(config),
faults: FaultController::new(config, seed),
}
}
@@ -341,15 +341,17 @@ impl<S: SegmentReader> SegmentReader for BuggifiedReader<S> {
struct FaultController {
config: CommitlogFaultConfig,
counters: Arc<FaultCounters>,
decisions: Arc<sim::DecisionSource>,
armed: Arc<AtomicBool>,
suspended: Arc<AtomicUsize>,
}
impl FaultController {
fn new(config: CommitlogFaultConfig) -> Self {
fn new(config: CommitlogFaultConfig, seed: DstSeed) -> Self {
Self {
config,
counters: Arc::default(),
decisions: Arc::new(sim::decision_source(seed)),
armed: Arc::new(AtomicBool::new(false)),
suspended: Arc::default(),
}
@@ -379,10 +381,7 @@ impl FaultController {
} else {
Duration::from_millis(1)
};
#[cfg(all(simulation, madsim))]
madsim::time::advance(latency);
#[cfg(not(all(simulation, madsim)))]
let _ = latency;
sim::advance_time(latency);
}
}
@@ -412,15 +411,7 @@ impl FaultController {
return false;
}
#[cfg(simulation)]
{
madsim::buggify::buggify_with_prob(probability)
}
#[cfg(not(simulation))]
{
let _ = probability;
false
}
self.decisions.sample_probability(probability)
}
fn summary(&self) -> DiskFaultSummary {
+2 -35
View File
@@ -2,11 +2,7 @@
use std::{future::Future, pin::Pin};
use crate::{
config::RunConfig,
seed::DstSeed,
workload::{module_ops::HostScenarioId, table_ops::TableScenarioId},
};
use crate::{config::RunConfig, seed::DstSeed, workload::table_ops::TableScenarioId};
/// Descriptor contract: CLI talks to this, not per-target ad hoc handlers.
pub trait TargetDescriptor {
@@ -118,37 +114,8 @@ fn format_relational_db_commitlog_outcome(
outcome.disk_faults.fsync_error,
outcome.disk_faults.open_error,
outcome.disk_faults.metadata_error,
outcome.runtime.known_tokio_tasks_scheduled,
outcome.runtime.known_runtime_tasks_scheduled,
outcome.runtime.durability_actors_started,
alive_tasks
)
}
pub struct StandaloneHostDescriptor;
impl TargetDescriptor for StandaloneHostDescriptor {
const NAME: &'static str = "standalone_host";
type Scenario = HostScenarioId;
fn prepare(_seed: DstSeed, _scenario: &Self::Scenario, _config: &RunConfig) -> anyhow::Result<()> {
crate::targets::standalone_host::prepare_generated_run()
}
fn run_streaming(seed: DstSeed, scenario: Self::Scenario, config: RunConfig) -> TargetRunFuture {
Box::pin(async move {
let outcome =
crate::targets::standalone_host::run_generated_with_config_and_scenario(seed, scenario, config).await?;
Ok(format!(
"ok target={} seed={} steps={} reducer_calls={} waits={} reopens={} noops={} expected_errors={}",
Self::NAME,
seed.0,
outcome.steps_executed,
outcome.reducer_calls,
outcome.scheduler_waits,
outcome.reopens,
outcome.noops,
outcome.expected_errors
))
})
}
}
-1
View File
@@ -3,4 +3,3 @@
pub(crate) mod buggified_repo;
pub mod descriptor;
pub mod relational_db_commitlog;
pub mod standalone_host;
@@ -37,6 +37,7 @@ use crate::{
},
schema::{SchemaPlan, SimRow},
seed::DstSeed,
sim,
targets::buggified_repo::{is_injected_disk_error_text, BuggifiedRepo, CommitlogFaultConfig},
workload::{
commitlog_ops::{CommitlogInteraction, CommitlogWorkloadOutcome, DurableReplaySummary},
@@ -193,7 +194,7 @@ impl RunStats {
fn runtime_summary(&self) -> RuntimeSummary {
RuntimeSummary {
known_tokio_tasks_scheduled: self.runtime.durability_actors_started,
known_runtime_tasks_scheduled: self.runtime.durability_actors_started,
durability_actors_started: self.runtime.durability_actors_started,
runtime_alive_tasks: runtime_alive_tasks(),
}
@@ -213,10 +214,10 @@ struct RelationalDbEngine {
last_observed_durable_offset: Option<u64>,
durability: Arc<InMemoryCommitlogDurability>,
durability_opts: spacetimedb_durability::local::Options,
runtime_handle: tokio::runtime::Handle,
runtime_handle: sim::RuntimeHandle,
commitlog_repo: StressCommitlogRepo,
stats: RunStats,
_runtime_guard: Option<tokio::runtime::Runtime>,
_runtime_guard: Option<sim::RuntimeGuard>,
}
impl RelationalDbEngine {
@@ -994,7 +995,7 @@ impl RelationalDbEngine {
.map_err(|err| format!("durability wait for tx offset {target_offset} failed: {err}"))?;
}
} else if forced {
tokio::task::yield_now().await;
sim::yield_now().await;
}
self.refresh_observed_durable_offset(forced)
}
@@ -1386,27 +1387,21 @@ type InMemoryCommitlogDurability = Local<ProductValue, StressCommitlogRepo>;
struct RelationalDbBootstrap {
db: RelationalDB,
runtime_handle: tokio::runtime::Handle,
runtime_handle: sim::RuntimeHandle,
commitlog_repo: StressCommitlogRepo,
durability: Arc<InMemoryCommitlogDurability>,
durability_opts: spacetimedb_durability::local::Options,
runtime_guard: Option<tokio::runtime::Runtime>,
runtime_guard: Option<sim::RuntimeGuard>,
}
fn bootstrap_relational_db(
seed: DstSeed,
fault_profile: CommitlogFaultProfile,
) -> anyhow::Result<RelationalDbBootstrap> {
let (runtime_handle, runtime_guard) = if let Ok(handle) = tokio::runtime::Handle::try_current() {
(handle, None)
} else {
let runtime = tokio::runtime::Runtime::new()?;
(runtime.handle().clone(), Some(runtime))
};
let (runtime_handle, runtime_guard) = sim::current_handle_or_new_runtime()?;
let fault_config = CommitlogFaultConfig::for_profile(fault_profile);
configure_simulation_buggify(fault_config.enabled());
let commitlog_repo = BuggifiedRepo::new(MemoryCommitlogRepo::new(8 * 1024 * 1024), fault_config);
let commitlog_repo = BuggifiedRepo::new(MemoryCommitlogRepo::new(8 * 1024 * 1024), fault_config, seed.fork(702));
let durability_opts = commitlog_stress_options(seed.fork(701));
let durability = Arc::new(
InMemoryCommitlogDurability::open_with_repo(commitlog_repo.clone(), runtime_handle.clone(), durability_opts)
@@ -1449,23 +1444,9 @@ fn commitlog_stress_options(seed: DstSeed) -> spacetimedb_durability::local::Opt
opts
}
fn configure_simulation_buggify(enabled: bool) {
#[cfg(simulation)]
{
if enabled {
madsim::buggify::enable();
} else {
madsim::buggify::disable();
}
}
#[cfg(not(simulation))]
let _ = enabled;
}
fn runtime_alive_tasks() -> Option<usize> {
// The madsim runtime exposes live task metrics on `Runtime`, but the target
// only receives Tokio-compatible handles. Keep this explicit instead of
// reporting madsim-tokio's dummy zero-valued metrics as real data.
// The shim only exposes Tokio-compatible handles today. Keep this explicit
// until the target owns a simulator/runtime that can report live task state.
None
}
-381
View File
@@ -1,381 +0,0 @@
//! Standalone host DST target (single scenario, no migration/subscriptions).
use std::{
path::PathBuf,
sync::{Arc, OnceLock},
time::{Instant, SystemTime, UNIX_EPOCH},
};
use bytes::Bytes;
use spacetimedb_client_api::{
auth::SpacetimeAuth, routes::subscribe::WebSocketOptions, ControlStateReadAccess, ControlStateWriteAccess,
NodeDelegate,
};
use spacetimedb_client_api_messages::websocket::v1 as ws_v1;
use spacetimedb_core::{
client::{ClientActorId, ClientConfig, ClientConnection},
config::CertificateAuthority,
db::{Config as DbConfig, Storage},
host::FunctionArgs,
messages::control_db::HostType,
util::jobs::JobCores,
};
use spacetimedb_lib::{ConnectionId, Identity};
use spacetimedb_paths::{RootDir, SpacetimePaths};
use spacetimedb_sats::ProductValue;
use spacetimedb_schema::{auto_migrate::MigrationPolicy, def::FunctionVisibility};
use spacetimedb_standalone::{StandaloneEnv, StandaloneOptions};
use tracing::trace;
use crate::{
client::SessionId,
config::RunConfig,
core::{self, StreamingProperties, TargetEngine},
seed::DstSeed,
workload::module_ops::{
HostScenarioId, ModuleInteraction, ModuleReducerSpec, ModuleWorkloadOutcome, ModuleWorkloadSource,
},
};
pub type StandaloneHostOutcome = ModuleWorkloadOutcome;
pub fn prepare_generated_run() -> anyhow::Result<()> {
let _ = compiled_module()?;
Ok(())
}
pub async fn run_generated_with_config_and_scenario(
seed: DstSeed,
scenario: HostScenarioId,
config: RunConfig,
) -> anyhow::Result<StandaloneHostOutcome> {
run_once_async(seed, scenario, config).await
}
async fn run_once_async(
seed: DstSeed,
scenario: HostScenarioId,
config: RunConfig,
) -> anyhow::Result<StandaloneHostOutcome> {
let module = compiled_module()?;
let reducers = extract_reducer_specs(module.clone()).await?;
let generator = ModuleWorkloadSource::new(seed, scenario, reducers, config.max_interactions_or_default(usize::MAX));
let engine = StandaloneHostEngine::new(seed, module).await?;
core::run_streaming(generator, engine, NoopHostProperties, config).await
}
#[derive(Clone)]
struct CompiledModuleInfo {
program_bytes: Bytes,
host_type: HostType,
}
fn compiled_module() -> anyhow::Result<Arc<CompiledModuleInfo>> {
static CACHE: OnceLock<Arc<CompiledModuleInfo>> = OnceLock::new();
if let Some(cached) = CACHE.get() {
return Ok(cached.clone());
}
let module_root = PathBuf::from(env!("CARGO_MANIFEST_DIR")).join("../../modules/module-test");
let (path, host_type) = spacetimedb_cli::build(&module_root, Some(PathBuf::from("src")).as_deref(), true, None)?;
let host_type: HostType = host_type.parse()?;
let program_bytes = std::fs::read(path)?;
let compiled = Arc::new(CompiledModuleInfo {
program_bytes: program_bytes.into(),
host_type,
});
let _ = CACHE.set(compiled.clone());
Ok(CACHE.get().expect("cache set or raced").clone())
}
async fn extract_reducer_specs(module: Arc<CompiledModuleInfo>) -> anyhow::Result<Vec<ModuleReducerSpec>> {
let module_def = spacetimedb_core::host::extract_schema(
module.program_bytes.clone().to_vec().into_boxed_slice(),
module.host_type,
)
.await?;
Ok(module_def
.reducers()
.filter(|reducer| reducer.visibility == FunctionVisibility::ClientCallable)
.map(|reducer| ModuleReducerSpec {
name: reducer.name.to_string(),
params: reducer
.params
.elements
.iter()
.map(|arg| arg.algebraic_type.clone())
.collect::<Vec<_>>(),
})
.collect::<Vec<_>>())
}
struct HostSession {
_env: Arc<StandaloneEnv>,
client: ClientConnection,
db_identity: Identity,
}
struct StandaloneHostEngine {
root_dir: RootDir,
session: Option<HostSession>,
module: Arc<CompiledModuleInfo>,
seed: DstSeed,
session_generation: u64,
step: usize,
reducer_calls: usize,
scheduler_waits: usize,
reopens: usize,
noops: usize,
expected_errors: usize,
}
impl StandaloneHostEngine {
async fn new(seed: DstSeed, module: Arc<CompiledModuleInfo>) -> anyhow::Result<Self> {
let root_dir = RootDir(std::env::temp_dir().join(format!(
"spacetimedb-dst-standalone-host-{}-{}-{}",
seed.0,
std::process::id(),
SystemTime::now().duration_since(UNIX_EPOCH)?.as_nanos()
)));
let _ = std::fs::remove_dir_all(&root_dir);
let session = open_session(
&root_dir,
&module,
None,
connection_id_for_session(seed, SessionId::ZERO, 0),
)
.await
.map_err(anyhow::Error::msg)?;
Ok(Self {
root_dir,
session: Some(session),
module,
seed,
session_generation: 1,
step: 0,
reducer_calls: 0,
scheduler_waits: 0,
reopens: 0,
noops: 0,
expected_errors: 0,
})
}
async fn execute(&mut self, interaction: &ModuleInteraction) -> Result<(), String> {
self.step = self.step.saturating_add(1);
match interaction {
ModuleInteraction::CallReducer { session, reducer, args } => {
if *session != SessionId::ZERO {
return Err(format!("standalone-host target has no session for {session}"));
}
self.reducer_calls = self.reducer_calls.saturating_add(1);
let request_id = (self.step as u32).saturating_sub(1);
let product = ProductValue::from_iter(args.iter().cloned());
let payload = spacetimedb_sats::bsatn::to_vec(&product).map_err(|e| e.to_string())?;
let res = self
.session
.as_mut()
.ok_or_else(|| "host session missing".to_string())?
.client
.call_reducer(
reducer,
FunctionArgs::Bsatn(payload.into()),
request_id,
Instant::now(),
ws_v1::CallReducerFlags::FullUpdate,
)
.await;
match res {
Ok(_) => Ok(()),
Err(err) => {
let msg = err.to_string();
if is_expected_error(reducer, &msg) {
self.expected_errors = self.expected_errors.saturating_add(1);
Ok(())
} else {
Err(format!("unexpected reducer error reducer={reducer}: {msg}"))
}
}
}
}
ModuleInteraction::WaitScheduled { millis } => {
self.scheduler_waits = self.scheduler_waits.saturating_add(1);
tokio::time::sleep(std::time::Duration::from_millis(*millis)).await;
Ok(())
}
ModuleInteraction::CloseReopen => {
self.reopens = self.reopens.saturating_add(1);
let db_identity = self
.session
.as_ref()
.ok_or_else(|| "host session missing".to_string())?
.db_identity;
let old = self.session.take();
drop(old);
let connection_id = connection_id_for_session(self.seed, SessionId::ZERO, self.session_generation);
self.session_generation = self.session_generation.saturating_add(1);
self.session =
Some(open_session(&self.root_dir, &self.module, Some(db_identity), connection_id).await?);
Ok(())
}
ModuleInteraction::NoOp => {
self.noops = self.noops.saturating_add(1);
Ok(())
}
}
}
fn outcome(&self) -> StandaloneHostOutcome {
StandaloneHostOutcome {
steps_executed: self.step,
reducer_calls: self.reducer_calls,
scheduler_waits: self.scheduler_waits,
reopens: self.reopens,
noops: self.noops,
expected_errors: self.expected_errors,
}
}
}
impl TargetEngine<ModuleInteraction> for StandaloneHostEngine {
type Observation = ();
type Outcome = StandaloneHostOutcome;
type Error = String;
#[allow(clippy::manual_async_fn)]
fn execute_interaction<'a>(
&'a mut self,
interaction: &'a ModuleInteraction,
) -> impl std::future::Future<Output = Result<Self::Observation, Self::Error>> + 'a {
async move {
trace!(?interaction, "standalone_host interaction");
self.execute(interaction).await
}
}
fn finish(&mut self) {}
#[allow(clippy::manual_async_fn)]
fn collect_outcome<'a>(&'a mut self) -> impl std::future::Future<Output = anyhow::Result<Self::Outcome>> + 'a {
async move { Ok(self.outcome()) }
}
}
struct NoopHostProperties;
impl StreamingProperties<ModuleInteraction, (), StandaloneHostEngine> for NoopHostProperties {
fn observe(
&mut self,
_engine: &StandaloneHostEngine,
_interaction: &ModuleInteraction,
_observation: &(),
) -> Result<(), String> {
Ok(())
}
fn finish(&mut self, _engine: &StandaloneHostEngine, _outcome: &StandaloneHostOutcome) -> Result<(), String> {
Ok(())
}
}
fn is_expected_error(_reducer: &str, msg: &str) -> bool {
msg.contains("permission denied")
}
fn connection_id_for_session(seed: DstSeed, session: SessionId, handle_generation: u64) -> ConnectionId {
let base = 1_000u64
.saturating_add((session.client.as_u32() as u64).saturating_mul(1_000_000))
.saturating_add((session.generation as u64).saturating_mul(10_000))
.saturating_add(handle_generation.saturating_mul(2));
let high = seed.fork(base).0 as u128;
let low = seed.fork(base.saturating_add(1)).0 as u128;
let id = (high << 64) | low;
ConnectionId::from_u128(id.max(1))
}
async fn open_session(
root_dir: &RootDir,
module: &CompiledModuleInfo,
maybe_db_identity: Option<Identity>,
connection_id: ConnectionId,
) -> Result<HostSession, String> {
let paths = SpacetimePaths::from_root_dir(root_dir);
let certs = CertificateAuthority::in_cli_config_dir(&paths.cli_config_dir);
let env = StandaloneEnv::init(
StandaloneOptions {
db_config: DbConfig {
storage: Storage::Disk,
page_pool_max_size: None,
},
websocket: WebSocketOptions::default(),
v8_heap_policy: Default::default(),
},
&certs,
paths.data_dir.into(),
JobCores::without_pinned_cores(),
)
.await
.map_err(|e| format!("standalone init failed: {e:#}"))?;
let caller_identity = Identity::ZERO;
let db_identity = match maybe_db_identity {
Some(identity) => identity,
None => {
SpacetimeAuth::alloc(&env)
.await
.map_err(|e| format!("db identity allocation failed: {e:#?}"))?
.claims
.identity
}
};
if env
.get_database_by_identity(&db_identity)
.await
.map_err(|e| format!("database lookup failed: {e:#}"))?
.is_none()
{
env.publish_database(
&caller_identity,
spacetimedb_client_api::DatabaseDef {
database_identity: db_identity,
program_bytes: module.program_bytes.clone(),
num_replicas: None,
host_type: module.host_type,
parent: None,
organization: None,
},
MigrationPolicy::Compatible,
)
.await
.map_err(|e| format!("publish module failed: {e:#}"))?;
}
let database = env
.get_database_by_identity(&db_identity)
.await
.map_err(|e| format!("database lookup after publish failed: {e:#}"))?
.ok_or_else(|| "database not found after publish".to_string())?;
let replica = env
.get_leader_replica_by_database(database.id)
.await
.ok_or_else(|| "leader replica not found".to_string())?;
let host = env
.leader(database.id)
.await
.map_err(|e| format!("leader host unavailable: {e:#}"))?;
let module_rx = host
.module_watcher()
.await
.map_err(|e| format!("module watcher failed: {e:#}"))?;
let client_id = ClientActorId {
identity: caller_identity,
connection_id,
name: env.client_actor_index().next_client_name(),
};
let client = ClientConnection::dummy(client_id, ClientConfig::for_test(), replica.id, module_rx);
Ok(HostSession {
_env: env,
client,
db_identity,
})
}
@@ -125,7 +125,7 @@ pub struct TransactionSummary {
#[derive(Clone, Debug, Default, Eq, PartialEq)]
pub struct RuntimeSummary {
pub known_tokio_tasks_scheduled: usize,
pub known_runtime_tasks_scheduled: usize,
pub durability_actors_started: usize,
pub runtime_alive_tasks: Option<usize>,
}
-1
View File
@@ -1,6 +1,5 @@
//! Shared workload generators reused by multiple DST targets.
pub mod commitlog_ops;
pub mod module_ops;
pub(crate) mod strategy;
pub mod table_ops;
@@ -1,128 +0,0 @@
use crate::{
client::SessionId,
core::WorkloadSource,
schema::generate_value_for_type,
seed::{DstRng, DstSeed},
workload::strategy::{Index, Strategy, Weighted},
};
use super::{HostScenarioId, ModuleInteraction, ModuleReducerSpec};
const MAX_REGEN_ATTEMPTS: usize = 16;
#[derive(Clone, Copy, Debug)]
enum ActionKind {
Reducer,
Wait,
Reopen,
}
/// Deterministic source for standalone-host interactions.
pub(crate) struct ModuleWorkloadSource {
scenario: HostScenarioId,
reducers: Vec<ModuleReducerSpec>,
rng: DstRng,
target_interactions: usize,
emitted: usize,
}
impl ModuleWorkloadSource {
pub fn new(
seed: DstSeed,
scenario: HostScenarioId,
reducers: Vec<ModuleReducerSpec>,
target_interactions: usize,
) -> Self {
Self {
scenario,
reducers,
rng: seed.fork(300).rng(),
target_interactions,
emitted: 0,
}
}
pub fn request_finish(&mut self) {
self.target_interactions = self.emitted;
}
fn choose_action(&mut self) -> ActionKind {
match self.scenario {
HostScenarioId::HostSmoke => Weighted::new(vec![
(85, ActionKind::Reducer),
(10, ActionKind::Wait),
(5, ActionKind::Reopen),
])
.sample(&mut self.rng),
}
}
fn generate_reducer_interaction(&mut self) -> Option<ModuleInteraction> {
if self.reducers.is_empty() {
return None;
}
let idx = Index::new(self.reducers.len()).sample(&mut self.rng);
let spec = &self.reducers[idx];
let mut args = Vec::with_capacity(spec.params.len());
for (arg_index, ty) in spec.params.iter().enumerate() {
if !supports_generation(ty) {
return None;
}
args.push(generate_value_for_type(&mut self.rng, ty, arg_index));
}
Some(ModuleInteraction::CallReducer {
session: SessionId::ZERO,
reducer: spec.name.clone(),
args,
})
}
fn generate_next(&mut self) -> ModuleInteraction {
for _ in 0..MAX_REGEN_ATTEMPTS {
let next = match self.choose_action() {
ActionKind::Reducer => self.generate_reducer_interaction(),
ActionKind::Wait => Some(ModuleInteraction::WaitScheduled { millis: 1_200 }),
ActionKind::Reopen => Some(ModuleInteraction::CloseReopen),
};
if let Some(next) = next {
return next;
}
}
ModuleInteraction::NoOp
}
}
fn supports_generation(ty: &spacetimedb_sats::AlgebraicType) -> bool {
use spacetimedb_sats::AlgebraicType;
matches!(
ty,
AlgebraicType::Bool
| AlgebraicType::I8
| AlgebraicType::U8
| AlgebraicType::I16
| AlgebraicType::U16
| AlgebraicType::I32
| AlgebraicType::U32
| AlgebraicType::I64
| AlgebraicType::U64
| AlgebraicType::I128
| AlgebraicType::U128
| AlgebraicType::String
)
}
impl WorkloadSource for ModuleWorkloadSource {
type Interaction = ModuleInteraction;
fn next_interaction(&mut self) -> Option<Self::Interaction> {
if self.emitted >= self.target_interactions {
return None;
}
self.emitted += 1;
Some(self.generate_next())
}
fn request_finish(&mut self) {
Self::request_finish(self);
}
}
@@ -1,7 +0,0 @@
//! Workload for standalone host/module testing.
mod generation;
mod types;
pub(crate) use generation::ModuleWorkloadSource;
pub use types::{HostScenarioId, ModuleInteraction, ModuleReducerSpec, ModuleWorkloadOutcome};
@@ -1,43 +0,0 @@
use spacetimedb_sats::AlgebraicType;
use crate::client::SessionId;
/// Single v1 scenario for standalone host target.
#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
pub enum HostScenarioId {
#[default]
HostSmoke,
}
/// Reducer metadata used by the typed argument generator.
#[derive(Clone, Debug, Eq, PartialEq)]
pub struct ModuleReducerSpec {
pub name: String,
pub params: Vec<AlgebraicType>,
}
/// One standalone-host interaction.
#[derive(Clone, Debug, Eq, PartialEq)]
pub enum ModuleInteraction {
CallReducer {
session: SessionId,
reducer: String,
args: Vec<spacetimedb_sats::AlgebraicValue>,
},
WaitScheduled {
millis: u64,
},
CloseReopen,
NoOp,
}
/// Run summary for standalone-host target.
#[derive(Clone, Debug, Eq, PartialEq)]
pub struct ModuleWorkloadOutcome {
pub steps_executed: usize,
pub reducer_calls: usize,
pub scheduler_waits: usize,
pub reopens: usize,
pub noops: usize,
pub expected_errors: usize,
}
-39
View File
@@ -1,39 +0,0 @@
#![cfg(all(simulation, madsim))]
use std::{net::SocketAddr, sync::Arc};
use tokio::{
io::{AsyncReadExt, AsyncWriteExt},
sync::Barrier,
};
#[test]
fn tcp_round_trip_over_madsim_tokio() {
let runtime = madsim::runtime::Runtime::new();
let server_addr: SocketAddr = "10.0.0.1:1".parse().unwrap();
let client_addr: SocketAddr = "10.0.0.2:1".parse().unwrap();
let server = runtime.create_node().ip(server_addr.ip()).build();
let client = runtime.create_node().ip(client_addr.ip()).build();
let ready = Arc::new(Barrier::new(2));
let server_ready = ready.clone();
let server_task = server.spawn(async move {
let listener = tokio::net::TcpListener::bind(server_addr).await.unwrap();
server_ready.wait().await;
let (mut stream, _) = listener.accept().await.unwrap();
stream.write_all(b"pong").await.unwrap();
stream.flush().await.unwrap();
});
let client_task = client.spawn(async move {
ready.wait().await;
let mut stream = tokio::net::TcpStream::connect(server_addr).await.unwrap();
let mut response = [0; 4];
stream.read_exact(&mut response).await.unwrap();
assert_eq!(&response, b"pong");
});
runtime.block_on(server_task).unwrap();
runtime.block_on(client_task).unwrap();
}
+2 -2
View File
@@ -23,13 +23,13 @@ spacetimedb-fs-utils.workspace = true
spacetimedb-paths.workspace = true
spacetimedb-sats.workspace = true
thiserror.workspace = true
tokio = { package = "madsim-tokio", path = "../../../../madsim/madsim-tokio", features = ["full"] }
tokio.workspace = true
tracing.workspace = true
[dev-dependencies]
spacetimedb-commitlog = { workspace = true, features = ["test"] }
tempfile.workspace = true
tokio = { package = "madsim-tokio", path = "../../../../madsim/madsim-tokio", features = ["full"] }
tokio.workspace = true
[lints]
workspace = true
+1 -1
View File
@@ -7,7 +7,7 @@ license-file = "LICENSE"
description = "Filesystem and network IO facade for SpacetimeDB crates"
[dependencies]
tokio = { package = "madsim-tokio", path = "../../../../madsim/madsim-tokio", features = ["full"] }
tokio.workspace = true
[lints]
workspace = true
+2 -3
View File
@@ -1,8 +1,7 @@
//! Narrow facade for SpacetimeDB-owned async IO boundaries.
//!
//! Production builds use Tokio through the `madsim-tokio` compatibility crate.
//! Simulation builds use the simulator implementations exposed by that same
//! compatibility crate.
//! This crate currently re-exports the Tokio filesystem, IO, and network APIs
//! that SpacetimeDB code is allowed to depend on directly.
//!
//! This crate is intentionally small. It is a migration point for filesystem and
//! network APIs reached by deterministic simulation tests, not a general runtime