Files
Shubham Mishra 0be66e3e3d Deterministic runtime crate (#5016)
# Description of Changes.

Introduces deterministic runtime crate.
Integrate it with RelationalDB.

I think best steps to review:
- Read the
[README](https://github.com/clockworklabs/SpacetimeDB/blob/shub/sim/crates/runtime/README.md)
of runtime crate.
- Look at the integration with existing crates - `durability`, `core`,
`snapshot`, etc.
- Read runtime crate's code.

Draft branch to Test code -
https://github.com/clockworklabs/SpacetimeDB/pull/5019

# API and ABI breaking changes
NA

# Expected complexity level and risk
Does not intend to change any production functionality, but it's big
code.

# Testing

- new crate contains unit and integration tests.
- Existing tests should work for production.

---------

Signed-off-by: Shubham Mishra <shivam828787@gmail.com>
Co-authored-by: Zeke Foppa <196249+bfops@users.noreply.github.com>
2026-06-04 14:59:37 +00:00

363 lines
14 KiB
Rust

#![cfg(feature = "simulation")]
#![allow(clippy::disallowed_macros)]
use std::{sync::Arc, time::Duration};
use futures::{
channel::{mpsc, oneshot},
StreamExt,
};
use spacetimedb_runtime::sim::{buggify, Rng, Runtime};
use spin::Mutex;
/// One reply produced by the simulated server.
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
struct Response {
id: u64,
value: u64,
at: Duration,
}
/// Trace entries recorded by the server so tests can assert schedule/fault outcomes.
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
enum ServerEvent {
Received { id: u64, at: Duration },
Dropped { id: u64, at: Duration },
Replied { id: u64, at: Duration },
}
/// A client request submitted to the simulated server.
struct Request {
id: u64,
input: u64,
respond_to: oneshot::Sender<Response>,
}
/// Complete result of the client/server workload for one seed.
#[derive(Debug, Eq, PartialEq)]
struct ClientServerRun {
responses: Vec<(u64, Option<Response>)>,
server_events: Vec<ServerEvent>,
elapsed: Duration,
}
fn assert_elapsed_at_least(elapsed: Duration, expected: Duration) {
assert!(elapsed >= expected, "elapsed {elapsed:?} < expected {expected:?}");
}
/// Checks the "same seed, same trace" side of the client/server workload.
/// Both the client-visible results and the server-side event trace should stay
/// stable for one fixed seed.
#[test]
fn client_server_buggify_injects_deterministic_faults() {
let run = run_buggified_client_server(404);
assert_eq!(
run.responses
.iter()
.map(|(id, r)| (*id, r.as_ref().map(|r| (r.id, r.value))))
.collect::<Vec<_>>(),
vec![
(0, Some((0, 40))),
(1, None),
(2, Some((2, 70))),
(3, Some((3, 90))),
(4, None),
]
);
for (_id, r) in &run.responses {
if let Some(r) = r {
let expected = match r.id {
0 => Duration::from_millis(1),
2 => Duration::from_millis(3),
3 => Duration::from_millis(4),
_ => continue,
};
assert!(r.at >= expected, "timestamp {:?} < expected {:?}", r.at, expected);
}
}
assert_eq!(
run.server_events
.iter()
.map(|e| match e {
ServerEvent::Received { id, .. } => ("Received", *id),
ServerEvent::Dropped { id, .. } => ("Dropped", *id),
ServerEvent::Replied { id, .. } => ("Replied", *id),
})
.collect::<Vec<_>>(),
vec![
("Received", 3),
("Received", 1),
("Received", 0),
("Received", 4),
("Received", 2),
("Replied", 0),
("Dropped", 1),
("Replied", 2),
("Replied", 3),
("Dropped", 4),
]
);
for event in &run.server_events {
let expected = match event {
ServerEvent::Received { .. } => Duration::ZERO,
ServerEvent::Dropped { id, .. } => Duration::from_millis(*id + 1),
ServerEvent::Replied { id, .. } => Duration::from_millis(*id + 1),
};
let at = match event {
ServerEvent::Received { at, .. } => *at,
ServerEvent::Dropped { at, .. } => *at,
ServerEvent::Replied { at, .. } => *at,
};
assert!(at >= expected, "timestamp {:?} < expected {:?}", at, expected);
}
assert_elapsed_at_least(run.elapsed, Duration::from_millis(5));
}
/// Checks the "different seed, different exploration" side of the same
/// client/server workload. The full run result should differ across seeds.
#[test]
fn client_server_buggify_differs_across_seeds() {
let seed_404 = run_buggified_client_server(404);
let seed_405 = run_buggified_client_server(405);
eprintln!("seed 404: {seed_404:#?}");
eprintln!("seed 405: {seed_405:#?}");
assert_ne!(seed_404, seed_405);
}
/// Fixed request set used by the client workload.
const CLIENT_REQUESTS: [(u64, u64); 5] = [(0, 4), (1, 5), (2, 7), (3, 9), (4, 11)];
/// Run a small concurrent client/server workload under one seed.
///
/// The client submits every request from its own simulated task. The server
/// receives requests in scheduler order, then spawns one worker per request.
/// Each worker sleeps for deterministic virtual latency and may drop the reply
/// based on buggify.
fn run_buggified_client_server(seed: u64) -> ClientServerRun {
// --- setup: runtime, buggify, two nodes, and communication channels ---
let mut runtime = Runtime::new(seed);
buggify::enable(&runtime);
let handle = runtime.handle();
let client_node = runtime.create_node().name("client").build();
let server_node = runtime.create_node().name("server").build();
// mpsc channel: client tasks send Request messages to the server task
let (request_tx, mut request_rx) = mpsc::unbounded::<Request>();
let server_events = Arc::new(Mutex::new(Vec::new()));
let (responses, server_events) = runtime.block_on(async move {
// --- server: receive 5 requests, spawn one worker per request ---
let server_handle = handle.clone();
let server_events_for_server = Arc::clone(&server_events);
let server = server_node.clone().spawn(async move {
let mut workers = Vec::new();
// Receive all 5 requests before processing any replies
for _ in 0..5 {
let request = request_rx.next().await.expect("client should send request");
server_events_for_server.lock().push(ServerEvent::Received {
id: request.id,
at: server_handle.now(),
});
// --- server worker: simulate latency, then drop or reply based on buggify ---
let worker_handle = server_handle.clone();
let worker_events = Arc::clone(&server_events_for_server);
workers.push(server_node.clone().spawn(async move {
// Deterministic virtual latency: each request id has a distinct sleep
worker_handle.sleep(Duration::from_millis(request.id + 1)).await;
// buggify decides whether to drop this request (40% probability)
if worker_handle.buggify_with_prob(0.4) {
worker_events.lock().push(ServerEvent::Dropped {
id: request.id,
at: worker_handle.now(),
});
return;
}
// No fault injected: send the reply
let response = Response {
id: request.id,
value: request.input * 10,
at: worker_handle.now(),
};
worker_events.lock().push(ServerEvent::Replied {
id: request.id,
at: response.at,
});
request
.respond_to
.send(response)
.expect("client should wait for response");
}));
}
// Wait for all server workers to complete
for worker in workers {
worker.await.expect("server worker should complete");
}
});
// --- client: spawn one task per request, send them to server, collect responses ---
let client_outer_node = client_node.clone();
let client = client_node.spawn(async move {
let mut requests = Vec::new();
// Spawn a task for each request so they submit concurrently
for (id, input) in CLIENT_REQUESTS {
let request_tx = request_tx.clone();
let client_request_node = client_outer_node.clone();
requests.push(client_request_node.spawn(async move {
let (respond_to, response_rx) = oneshot::channel();
request_tx
.unbounded_send(Request { id, input, respond_to })
.expect("server inbox should be open");
// Await the server's reply (None if the server dropped this request)
(id, response_rx.await.ok())
}));
}
// All requests sent, close the channel so the server loop terminates
drop(request_tx);
// Collect responses in spawn order
let mut responses = Vec::new();
for request in requests {
responses.push(request.await.expect("client request task should complete"));
}
responses
});
// Drive both client and server to completion
let responses = client.await.expect("client task should complete");
server.await.expect("server task should complete");
(responses, server_events.lock().clone())
});
// --- package the results: client responses, server trace, and total virtual time ---
ClientServerRun {
responses,
server_events,
elapsed: runtime.elapsed(),
}
}
/// Exercises the executor, node pause/resume, and timer wheel together:
/// paused node work must not run until resumed, and all nodes must observe
/// one shared virtual clock.
#[test]
fn multi_node_runtime_coordinates_pause_resume_and_virtual_time() {
let mut runtime = Runtime::new(101);
let handle = runtime.handle();
let node_a = runtime.create_node().name("a").build();
let node_b = runtime.create_node().name("b").build();
let events = Arc::new(Mutex::new(Vec::new()));
node_b.pause();
runtime.block_on({
let events = Arc::clone(&events);
async move {
let a_handle = handle.clone();
let a_events = Arc::clone(&events);
let a = node_a.spawn(async move {
a_events.lock().push(("a_started", a_handle.now()));
a_handle.sleep(Duration::from_millis(3)).await;
a_events.lock().push(("a_finished", a_handle.now()));
});
let b_handle = handle.clone();
let b_events = Arc::clone(&events);
let b = node_b.spawn(async move {
b_events.lock().push(("b_started", b_handle.now()));
b_handle.sleep(Duration::from_millis(2)).await;
b_events.lock().push(("b_finished", b_handle.now()));
});
handle.sleep(Duration::from_millis(1)).await;
events.lock().push(("main_resumed_b", handle.now()));
node_b.resume();
a.await.expect("node a task should complete");
b.await.expect("node b task should complete");
}
});
let events = events.lock();
let get = |name: &str| events.iter().find(|(n, _)| *n == name).map(|(_, t)| *t).unwrap();
// a starts with only per-task overhead accumulated before its first poll
assert!(get("a_started") >= Duration::ZERO);
// main resumes b at ~1ms
assert!(get("main_resumed_b") >= Duration::from_millis(1));
// b starts immediately after resume
assert!(get("b_started") >= Duration::from_millis(1));
// both finish at ~3ms
assert!(get("a_finished") >= Duration::from_millis(3));
assert!(get("b_finished") >= Duration::from_millis(3));
assert_elapsed_at_least(runtime.elapsed(), Duration::from_millis(3));
}
/// Checks that runtime-owned buggify decisions consume the same seeded RNG
/// sequence as an explicit `Rng`, making injected faults replayable by seed.
#[test]
fn runtime_buggify_matches_standalone_rng_sequence() {
let seed = 77;
let runtime = Runtime::new(seed);
let expected = Rng::new(seed);
buggify::enable(&runtime);
expected.enable_buggify();
let actual = (0..8)
.map(|_| buggify::should_inject_fault_with_prob(&runtime, 0.5))
.collect::<Vec<_>>();
let expected = (0..8).map(|_| expected.buggify_with_prob(0.5)).collect::<Vec<_>>();
assert_eq!(actual, expected);
assert!(buggify::is_enabled(&runtime));
buggify::disable(&runtime);
assert!(!buggify::is_enabled(&runtime));
assert!(!buggify::should_inject_fault_with_prob(&runtime, 1.0));
}
/// Verifies timeout races are driven by virtual time, not wall time: the fast
/// node completes at 2ms, then the slow node times out at the shared 4ms
/// deadline.
#[test]
fn multi_node_timeout_uses_shared_virtual_clock() {
let mut runtime = Runtime::new(303);
let handle = runtime.handle();
let slow_node = runtime.create_node().name("slow").build();
let fast_node = runtime.create_node().name("fast").build();
let output = runtime.block_on(async move {
let slow_handle = handle.clone();
let slow = slow_node.spawn(async move {
slow_handle
.timeout(Duration::from_millis(4), async {
slow_handle.sleep(Duration::from_millis(10)).await;
"slow-finished"
})
.await
});
let fast_handle = handle.clone();
let fast = fast_node.spawn(async move {
fast_handle.sleep(Duration::from_millis(2)).await;
("fast-finished", fast_handle.now())
});
(
slow.await.expect("slow node task should complete"),
fast.await.expect("fast node task should complete"),
)
});
let (slow, fast) = output;
assert_eq!(fast.0, "fast-finished");
assert!(fast.1 >= Duration::from_millis(2));
assert_eq!(slow.unwrap_err().duration(), Duration::from_millis(4));
assert_elapsed_at_least(runtime.elapsed(), Duration::from_millis(4));
}