Files
SpacetimeDB/crates/testing/tests/standalone_integration_test.rs
Mazdak Farrokhzad 0a3251708a Add ProcedureContext::with_tx (#3638)
# Description of Changes

Adds `ProcedureContext::{with_tx, try_with_tx}`.

Fixes https://github.com/clockworklabs/SpacetimeDB/issues/3515.

# API and ABI breaking changes

None

# Expected complexity level and risk

2

# Testing

An integration test `test_calling_with_tx` is added.
2025-11-19 20:33:02 +00:00

452 lines
14 KiB
Rust

use serial_test::serial;
use spacetimedb_lib::sats::{product, AlgebraicValue};
use spacetimedb_testing::modules::{
CompilationMode, CompiledModule, Csharp, LogLevel, LoggerRecord, ModuleHandle, ModuleLanguage, Rust, TypeScript,
DEFAULT_CONFIG, IN_MEMORY_CONFIG,
};
use std::{
future::Future,
time::{Duration, Instant},
};
fn init() {
let _ = env_logger::builder()
.parse_filters(
"spacetimedb=trace,spacetimedb_client_api=trace,spacetimedb_lib=trace,spacetimedb_standalone=trace",
)
.is_test(true)
// `try_init` and ignore failures to continue if a logger is already registered.
// This allows us to call `init` at the start of every test without a `once_cell` or similar.
.try_init();
}
async fn read_logs(module: &ModuleHandle) -> Vec<String> {
module
.read_log(None)
.await
.trim()
.split('\n')
.map(|line| {
let record: LoggerRecord = serde_json::from_str(line).unwrap();
if matches!(record.level, LogLevel::Panic | LogLevel::Error | LogLevel::Warn) {
panic!("Found an error-like log line: {line}");
}
record.message
})
.skip_while(|line| line != "Database initialized")
.skip(1)
.collect::<Vec<_>>()
}
// The tests MUST be run in sequence because they read the OS environment
// and can cause a race when run in parallel.
fn test_calling_a_reducer_in_module(module_name: &'static str) {
init();
CompiledModule::compile(module_name, CompilationMode::Debug).with_module_async(
DEFAULT_CONFIG,
|module| async move {
let json =
r#"{"CallReducer": {"reducer": "add", "args": "[\"Tyrion\", 24]", "request_id": 0, "flags": 0 }}"#
.to_string();
module.send(json).await.unwrap();
let json =
r#"{"CallReducer": {"reducer": "add", "args": "[\"Cersei\", 31]", "request_id": 1, "flags": 0 }}"#
.to_string();
module.send(json).await.unwrap();
let json =
r#"{"CallReducer": {"reducer": "say_hello", "args": "[]", "request_id": 2, "flags": 0 }}"#.to_string();
module.send(json).await.unwrap();
let json = r#"{"CallReducer": {"reducer": "list_over_age", "args": "[30]", "request_id": 3, "flags": 0 }}"#
.to_string();
module.send(json).await.unwrap();
let json =
r#"{"CallReducer": {"reducer": "log_module_identity", "args": "[]", "request_id": 4, "flags": 0 }}"#
.to_string();
module.send(json).await.unwrap();
assert_eq!(
read_logs(&module).await,
[
"Hello, Tyrion!",
"Hello, Cersei!",
"Hello, World!",
"Cersei has age 31 >= 30",
]
.into_iter()
.map(String::from)
.chain(std::iter::once(format!("Module identity: {}", module.db_identity)))
.collect::<Vec<_>>()
);
},
);
}
#[test]
#[serial]
fn test_calling_a_reducer() {
test_calling_a_reducer_in_module("module-test");
}
#[test]
#[serial]
fn test_calling_a_reducer_csharp() {
test_calling_a_reducer_in_module("module-test-cs");
}
#[test]
#[serial]
fn test_calling_a_reducer_typescript() {
test_calling_a_reducer_in_module("module-test-ts");
}
#[test]
#[serial]
fn test_calling_a_reducer_with_private_table() {
init();
CompiledModule::compile("module-test", CompilationMode::Debug).with_module_async(
DEFAULT_CONFIG,
|module| async move {
module
.call_reducer_binary("add_private", &product!["Tyrion"])
.await
.unwrap();
module.call_reducer_binary("query_private", &product![]).await.unwrap();
let logs = read_logs(&module)
.await
.into_iter()
.skip_while(|r| r.starts_with("Timestamp"))
.collect::<Vec<_>>();
assert_eq!(logs, ["Private, Tyrion!", "Private, World!",].map(String::from));
},
);
}
async fn read_log_skip_repeating(module: &ModuleHandle) -> String {
let logs = read_logs(module).await;
let mut logs = logs
.into_iter()
// Filter out log lines from the `repeating_test` reducer,
// which runs frequently enough to appear in our logs after we've slept a second.
.filter(|line| !line.starts_with("Timestamp: Timestamp { __timestamp_micros_since_unix_epoch__: "))
.collect::<Vec<_>>();
if logs.len() != 1 {
panic!("Expected a single log message but found {logs:#?}");
};
logs.swap_remove(0)
}
fn test_calling_a_procedure_in_module(module_name: &'static str) {
init();
CompiledModule::compile(module_name, CompilationMode::Debug).with_module_async(
DEFAULT_CONFIG,
|module| async move {
let json = r#"
{
"CallProcedure": {
"procedure": "sleep_one_second",
"args": "[]",
"request_id": 0,
"flags": 0
}
}"#
.to_string();
module.send(json).await.unwrap();
// It sleeps one second, but we'll wait two just to be safe.
tokio::time::sleep(std::time::Duration::from_secs(2)).await;
let log_sleep = read_log_skip_repeating(&module).await;
assert!(log_sleep.starts_with("Slept from "));
assert!(log_sleep.contains("a total of"));
},
)
}
#[test]
#[serial]
fn test_calling_a_procedure() {
test_calling_a_procedure_in_module("module-test");
}
fn test_calling_with_tx_in_module(module_name: &'static str) {
init();
CompiledModule::compile(module_name, CompilationMode::Debug).with_module_async(
DEFAULT_CONFIG,
|module| async move {
let json = r#"
{
"CallProcedure": {
"procedure": "with_tx",
"args": "[]",
"request_id": 0,
"flags": 0
}
}"#
.to_string();
module.send(json).await.unwrap();
// Wait 1 second just to be safe.
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
let log = read_log_skip_repeating(&module).await;
assert!(log.contains("Hello, World!"));
},
)
}
#[test]
#[serial]
fn test_calling_with_tx() {
test_calling_with_tx_in_module("module-test");
}
/// Invoke the `module-test` module,
/// use `caller` to invoke its `test` reducer,
/// and assert that its logs look right.
///
/// `caller` must invoke the reducer with args equivalent to:
/// ```ignore
/// [
/// TestA {
/// x: 0,
/// y: 2,
/// z: "Macro".to_string(),
/// },
/// TestB {
/// foo: "Foo".to_string(),
/// },
/// TestC::Foo,
/// TestF::Baz("buzz".to_string()),
/// ]
/// ```
fn test_call_query_macro_with_caller<F: Future<Output = ()>>(caller: impl FnOnce(ModuleHandle) -> F) {
CompiledModule::compile("module-test", CompilationMode::Debug).with_module_async(
DEFAULT_CONFIG,
|module| async move {
caller(module.clone()).await;
let logs = read_logs(&module)
.await
.into_iter()
.filter(|line| {
!(line.starts_with("sender:") || line.starts_with("timestamp:") || line.starts_with("Timestamp"))
})
.collect::<Vec<_>>();
assert_eq!(
logs,
[
"BEGIN",
r#"bar: "Foo""#,
"Foo",
"buzz",
"Row count before delete: 1000",
r#"Inserted: TestE { id: 1, name: "Tyler" }"#,
"Row count after delete: 995",
"Row count filtered by condition: 995",
"MultiColumn",
"Row count filtered by multi-column condition: 199",
"END",
]
.map(String::from)
);
},
);
}
/// Call the `module-test` module's `test` reducer with a variety of ways of passing arguments.
#[test]
#[serial]
fn test_call_query_macro() {
// Hand-written JSON. This will fail if the JSON encoding of `ClientMessage` changes.
test_call_query_macro_with_caller(|module| async move {
// Note that JSON doesn't allow multiline strings, so the encoded args string must be on one line!
let json = r#"
{ "CallReducer": {
"reducer": "test",
"args":
"[ { \"x\": 0, \"y\": 2, \"z\": \"Macro\" }, { \"foo\": \"Foo\" }, { \"Foo\": {} }, { \"Baz\": \"buzz\" } ]",
"request_id": 0,
"flags": 0
} }"#
.to_string();
module.send(json).await.unwrap();
});
let args_pv = &product![
product![0u32, 2u32, "Macro"],
product!["Foo"],
AlgebraicValue::sum(0, AlgebraicValue::unit()),
AlgebraicValue::sum(2, AlgebraicValue::String("buzz".into())),
];
// JSON via the `Serialize` path.
test_call_query_macro_with_caller(|module| async move {
module.call_reducer_json("test", args_pv).await.unwrap();
});
// BSATN via the `Serialize` path.
test_call_query_macro_with_caller(|module| async move {
module.call_reducer_binary("test", args_pv).await.unwrap();
});
}
#[test]
#[serial]
/// This test runs the index scan workloads in the `perf-test` module.
/// Timing spans should be < 1ms if the correct index was used.
/// Otherwise these workloads will degenerate into full table scans.
fn test_index_scans() {
init();
CompiledModule::compile("perf-test", CompilationMode::Release).with_module_async(
IN_MEMORY_CONFIG,
|module| async move {
let no_args = &product![];
module
.call_reducer_binary("load_location_table", no_args)
.await
.unwrap();
module
.call_reducer_binary("test_index_scan_on_id", no_args)
.await
.unwrap();
module
.call_reducer_binary("test_index_scan_on_chunk", no_args)
.await
.unwrap();
module
.call_reducer_binary("test_index_scan_on_x_z_dimension", no_args)
.await
.unwrap();
module
.call_reducer_binary("test_index_scan_on_x_z", no_args)
.await
.unwrap();
let logs = read_logs(&module).await;
// Each timing span should be < 1ms
let timing = |line: &str| {
line.starts_with("Timing span")
&& (line.ends_with("ns") || line.ends_with("us") || line.ends_with("µs"))
};
assert!(timing(&logs[0]));
assert!(timing(&logs[1]));
assert!(timing(&logs[2]));
assert!(timing(&logs[3]));
},
);
}
async fn bench_call(module: &ModuleHandle, call: &str, count: &u32) -> Duration {
let now = Instant::now();
// Note: using JSON variant because some functions accept u64 instead, so we rely on JSON's dynamic typing.
module.call_reducer_json(call, &product![*count]).await.unwrap();
now.elapsed()
}
#[allow(clippy::disallowed_macros)]
async fn _run_bench_db(module: ModuleHandle, benches: &[(&str, u32, &str)]) {
let expect: Vec<_> = benches.iter().map(|x| x.2.to_string()).collect();
let mut timings = Vec::with_capacity(benches.len());
for (name, count, _) in benches {
let elapsed = bench_call(&module, name, count).await;
timings.push((name, count, elapsed));
}
assert_eq!(read_logs(&module).await, expect);
for (name, rows, elapsed) in timings {
println!("RUN {name:<30} x {rows:>10} rows: {elapsed:>20.3?}");
}
}
fn test_calling_bench_db_circles<L: ModuleLanguage>() {
L::get_module().with_module_async(DEFAULT_CONFIG, |module| async move {
#[rustfmt::skip]
let benches = [
("insert_bulk_food", 50, "INSERT FOOD: 50"),
("insert_bulk_entity", 50, "INSERT ENTITY: 50"),
("insert_bulk_circle", 500, "INSERT CIRCLE: 500"),
("cross_join_circle_food", 50 * 500, "CROSS JOIN CIRCLE FOOD: 25000, processed: 2500"),
("cross_join_all", 50 * 50 * 500, "CROSS JOIN ALL: 1250000, processed: 1250000"),
];
_run_bench_db(module, &benches).await
});
}
#[test]
#[serial]
fn test_calling_bench_db_circles_rust() {
test_calling_bench_db_circles::<Rust>();
}
#[test]
#[serial]
fn test_calling_bench_db_circles_csharp() {
test_calling_bench_db_circles::<Csharp>();
}
#[test]
#[serial]
fn test_calling_bench_db_circles_typescript() {
test_calling_bench_db_circles::<TypeScript>();
}
fn test_calling_bench_db_ia_loop<L: ModuleLanguage>() {
L::get_module().with_module_async(DEFAULT_CONFIG, |module| async move {
#[rustfmt::skip]
let benches = [
("insert_bulk_position", 20_000, "INSERT POSITION: 20000"),
("insert_bulk_velocity", 10_000, "INSERT VELOCITY: 10000"),
("update_position_all", 20_000, "UPDATE POSITION ALL: 20000, processed: 20000"),
("update_position_with_velocity", 10_000, "UPDATE POSITION BY VELOCITY: 10000, processed: 10000"),
("insert_world", 5_000, "INSERT WORLD PLAYERS: 5000"),
// Note: we set lower amount of ia loop players here than in benchmarks.
// Otherwise tests will take forever because they are built in debug mode.
("game_loop_enemy_ia", 100, "ENEMY IA LOOP PLAYERS: 100, processed: 5000"),
];
_run_bench_db(module, &benches).await
});
}
#[test]
#[serial]
fn test_calling_bench_db_ia_loop_rust() {
test_calling_bench_db_ia_loop::<Rust>();
}
#[test]
#[serial]
fn test_calling_bench_db_ia_loop_csharp() {
test_calling_bench_db_ia_loop::<Csharp>();
}
#[test]
#[serial]
fn test_calling_bench_db_ia_loop_typescript() {
test_calling_bench_db_ia_loop::<TypeScript>();
}