Merge remote-tracking branch 'origin/master' into phoebe/parallelism-test/sequential-eval-incr

This commit is contained in:
Phoebe Goldman
2025-04-04 09:56:04 -04:00
13 changed files with 344 additions and 16 deletions
+2
View File
@@ -0,0 +1,2 @@
[build]
rustflags = ["--cfg", "tokio_unstable"]
+13
View File
@@ -191,6 +191,19 @@ jobs:
dotnet tool restore
dotnet csharpier --check .
- name: Run `cargo doc` for bindings crate
# `bindings` is the only crate we care strongly about documenting,
# since we link to its docs.rs from our website.
# We won't pass `--no-deps`, though,
# since we want everything reachable through it to also work.
# This includes `sats` and `lib`.
working-directory: crates/bindings
env:
# Make `cargo doc` exit with error on warnings, most notably broken links
RUSTDOCFLAGS: '--deny warnings'
run: |
cargo doc
wasm_bindings:
name: Build and test wasm bindings
runs-on: spacetimedb-runner
+1
View File
@@ -310,3 +310,4 @@ features = [
"broadcast",
"ondemand",
]
@@ -357,10 +357,13 @@ public record struct Timestamp(long MicrosecondsSinceUnixEpoch)
public readonly TimeSpan ToTimeSpanSinceUnixEpoch() => (TimeSpan)ToTimeDurationSinceUnixEpoch();
public readonly TimeDuration TimeDurationSince(Timestamp earlier) =>
new TimeDuration(MicrosecondsSinceUnixEpoch - earlier.MicrosecondsSinceUnixEpoch);
new TimeDuration(checked(MicrosecondsSinceUnixEpoch - earlier.MicrosecondsSinceUnixEpoch));
public static Timestamp operator +(Timestamp point, TimeDuration interval) =>
new Timestamp(point.MicrosecondsSinceUnixEpoch + interval.Microseconds);
new Timestamp(checked(point.MicrosecondsSinceUnixEpoch + interval.Microseconds));
public static Timestamp operator -(Timestamp point, TimeDuration interval) =>
new Timestamp(checked(point.MicrosecondsSinceUnixEpoch - interval.Microseconds));
public int CompareTo(Timestamp that)
{
@@ -430,6 +433,12 @@ public record struct TimeDuration(long Microseconds) : IStructuralReadWrite
public static implicit operator TimeDuration(TimeSpan timeSpan) =>
new(timeSpan.Ticks / Util.TicksPerMicrosecond);
public static TimeDuration operator +(TimeDuration lhs, TimeDuration rhs) =>
new TimeDuration(checked(lhs.Microseconds + rhs.Microseconds));
public static TimeDuration operator -(TimeDuration lhs, TimeDuration rhs) =>
new TimeDuration(checked(lhs.Microseconds + rhs.Microseconds));
// For backwards-compatibility.
public readonly TimeSpan ToStd() => this;
+2 -1
View File
@@ -1,6 +1,7 @@
#![doc = include_str!("../README.md")]
// ^ if you are working on docs, go read the top comment of README.md please.
#[cfg(feature = "unstable")]
mod client_visibility_filter;
pub mod log_stopwatch;
mod logger;
@@ -18,7 +19,7 @@ pub use log;
#[cfg(feature = "rand")]
pub use rand;
#[doc(hidden)]
#[cfg(feature = "unstable")]
pub use client_visibility_filter::Filter;
#[cfg(feature = "rand")]
pub use rng::StdbRng;
+5 -1
View File
@@ -119,12 +119,13 @@ tikv-jemalloc-ctl = {workspace = true}
[features]
# Print a warning when doing an unindexed `iter_by_col_range` on a large table.
unindexed_iter_by_col_range_warn = []
# Optional storage engines.
default = ["unindexed_iter_by_col_range_warn"]
# Enable timing for wasm ABI calls
spacetimedb-wasm-instance-env-times = []
# Enable test helpers and utils
test = ["spacetimedb-commitlog/test"]
# Perfmaps for profiling modules
perfmap = []
[dev-dependencies]
spacetimedb-lib = { path = "../lib", features = ["proptest"] }
@@ -141,3 +142,6 @@ pretty_assertions.workspace = true
jsonwebtoken.workspace = true
axum.workspace = true
reqwest.workspace = true
[lints.rust]
unexpected_cfgs = { level = "warn", check-cfg = ['cfg(tokio_unstable)'] }
+7
View File
@@ -31,6 +31,13 @@ impl WasmtimeRuntime {
.consume_fuel(true)
.wasm_backtrace_details(wasmtime::WasmBacktraceDetails::Enable);
// Offer a compile-time flag for enabling perfmap generation,
// so `perf` can display JITted symbol names.
// Ideally we would be able to configure this at runtime via a flag to `spacetime start`,
// but this is good enough for now.
#[cfg(feature = "perfmap")]
config.profiler(wasmtime::ProfilingStrategy::PerfMap);
// ignore errors for this - if we're not able to set up caching, that's fine, it's just an optimization
let _ = Self::set_cache_config(&mut config, data_dir.wasmtime_cache());
+101
View File
@@ -39,6 +39,46 @@ metrics_group!(
#[labels(node_id: str)]
pub jemalloc_resident_bytes: IntGaugeVec,
#[name = tokio_num_workers]
#[help = "Number of core tokio workers"]
#[labels(node_id: str)]
pub tokio_num_workers: IntGaugeVec,
#[name = tokio_num_blocking_threads]
#[help = "Number of extra tokio threads for blocking tasks"]
#[labels(node_id: str)]
pub tokio_num_blocking_threads: IntGaugeVec,
#[name = tokio_num_idle_blocking_threads]
#[help = "Number of tokio blocking threads that are idle"]
#[labels(node_id: str)]
pub tokio_num_idle_blocking_threads: IntGaugeVec,
#[name = tokio_num_alive_tasks]
#[help = "Number of tokio tasks that are still alive"]
#[labels(node_id: str)]
pub tokio_num_alive_tasks: IntGaugeVec,
#[name = tokio_global_queue_depth]
#[help = "Number of tasks in tokios global queue"]
#[labels(node_id: str)]
pub tokio_global_queue_depth: IntGaugeVec,
#[name = tokio_blocking_queue_depth]
#[help = "Number of tasks in tokios blocking task queue"]
#[labels(node_id: str)]
pub tokio_blocking_queue_depth: IntGaugeVec,
#[name = tokio_spawned_tasks_count]
#[help = "Number of tokio tasks spawned"]
#[labels(node_id: str)]
pub tokio_spawned_tasks_count: IntCounterVec,
#[name = tokio_remote_schedule_count]
#[help = "Number of tasks spawned from outside the tokio runtime"]
#[labels(node_id: str)]
pub tokio_remote_schedule_count: IntCounterVec,
#[name = spacetime_websocket_sent_msg_size_bytes]
#[help = "The size of messages sent to connected sessions"]
#[labels(db: Identity, workload: WorkloadType)]
@@ -165,3 +205,64 @@ pub fn spawn_jemalloc_stats(node_id: String) {
});
});
}
// How frequently to update the tokio stats.
const TOKIO_STATS_INTERVAL: Duration = Duration::from_secs(10);
static SPAWN_TOKIO_STATS_GUARD: Once = Once::new();
pub fn spawn_tokio_stats(node_id: String) {
SPAWN_TOKIO_STATS_GUARD.call_once(|| {
spawn(async move {
// Set up our metric handles, so we don't keep calling `with_label_values`.
let num_worker_metric = WORKER_METRICS.tokio_num_workers.with_label_values(&node_id);
let num_blocking_threads_metric = WORKER_METRICS.tokio_num_blocking_threads.with_label_values(&node_id);
let num_alive_tasks_metric = WORKER_METRICS.tokio_num_alive_tasks.with_label_values(&node_id);
let global_queue_depth_metric = WORKER_METRICS.tokio_global_queue_depth.with_label_values(&node_id);
let num_idle_blocking_threads_metric = WORKER_METRICS
.tokio_num_idle_blocking_threads
.with_label_values(&node_id);
let blocking_queue_depth_metric = WORKER_METRICS.tokio_blocking_queue_depth.with_label_values(&node_id);
let spawned_tasks_count_metric = WORKER_METRICS.tokio_spawned_tasks_count.with_label_values(&node_id);
let remote_schedule_count_metric = WORKER_METRICS.tokio_remote_schedule_count.with_label_values(&node_id);
loop {
let metrics = tokio::runtime::Handle::current().metrics();
num_worker_metric.set(metrics.num_workers() as i64);
num_alive_tasks_metric.set(metrics.num_alive_tasks() as i64);
global_queue_depth_metric.set(metrics.global_queue_depth() as i64);
#[cfg(tokio_unstable)]
{
log::info!("Has unstable metrics");
num_blocking_threads_metric.set(metrics.num_blocking_threads() as i64);
num_idle_blocking_threads_metric.set(metrics.num_idle_blocking_threads() as i64);
blocking_queue_depth_metric.set(metrics.blocking_queue_depth() as i64);
}
log::info!("after unstable metrics");
// The spawned tasks count and remote schedule count are cumulative,
// so we need to increment them by the difference from the last value.
#[cfg(all(target_has_atomic = "64", tokio_unstable))]
{
{
let current_count = metrics.spawned_tasks_count();
let previous_value = spawned_tasks_count_metric.get();
// The tokio metric should be monotonically increasing, but we are checking just in case.
if let Some(diff) = current_count.checked_sub(previous_value) {
spawned_tasks_count_metric.inc_by(diff);
}
}
{
let current_count = metrics.remote_schedule_count();
let previous_value = remote_schedule_count_metric.get();
// The tokio metric should be monotonically increasing, but we are checking just in case.
if let Some(diff) = current_count.checked_sub(previous_value) {
remote_schedule_count_metric.inc_by(diff);
}
}
}
#[cfg(target_has_atomic = "64")]
// TODO: Consider adding some of the worker metrics as well, like overflows, steals, etc.
sleep(TOKIO_STATS_INTERVAL).await;
}
});
});
}
+4 -4
View File
@@ -121,22 +121,22 @@ impl ProductType {
self.is_i64_newtype(TIME_DURATION_TAG)
}
/// Returns whether this is the special tag of [`Identity`].
/// Returns whether this is the special tag of `Identity`.
pub fn is_identity_tag(tag_name: &str) -> bool {
tag_name == IDENTITY_TAG
}
/// Returns whether this is the special tag of [`ConnectionId`].
/// Returns whether this is the special tag of `ConnectionId`.
pub fn is_connection_id_tag(tag_name: &str) -> bool {
tag_name == CONNECTION_ID_TAG
}
/// Returns whether this is the special tag of [`Timestamp`].
/// Returns whether this is the special tag of [`crate::timestamp::Timestamp`].
pub fn is_timestamp_tag(tag_name: &str) -> bool {
tag_name == TIMESTAMP_TAG
}
/// Returns whether this is the special tag of [`TimeDuration`].
/// Returns whether this is the special tag of [`crate::time_duration::TimeDuration`].
pub fn is_time_duration_tag(tag_name: &str) -> bool {
tag_name == TIME_DURATION_TAG
}
+79
View File
@@ -1,6 +1,7 @@
use crate::timestamp::MICROSECONDS_PER_SECOND;
use crate::{de::Deserialize, impl_st, ser::Serialize, AlgebraicType, AlgebraicValue};
use std::fmt;
use std::ops::{Add, AddAssign, Sub, SubAssign};
use std::time::Duration;
#[derive(Eq, PartialEq, Ord, PartialOrd, Copy, Clone, Hash, Serialize, Deserialize, Debug)]
@@ -69,6 +70,16 @@ impl TimeDuration {
.expect("Duration overflows i64 microseconds"),
)
}
/// Returns `Some(self + other)`, or `None` if that value would be out of bounds for [`TimeDuration`].
pub fn checked_add(self, other: Self) -> Option<Self> {
self.to_micros().checked_add(other.to_micros()).map(Self::from_micros)
}
/// Returns `Some(self - other)`, or `None` if that value would be out of bounds for [`TimeDuration`].
pub fn checked_sub(self, other: Self) -> Option<Self> {
self.to_micros().checked_sub(other.to_micros()).map(Self::from_micros)
}
}
impl From<Duration> for TimeDuration {
@@ -96,6 +107,40 @@ impl fmt::Display for TimeDuration {
}
}
impl Add for TimeDuration {
type Output = Self;
fn add(self, rhs: Self) -> Self::Output {
self.checked_add(rhs).unwrap()
}
}
impl Sub for TimeDuration {
type Output = Self;
fn sub(self, rhs: Self) -> Self::Output {
self.checked_sub(rhs).unwrap()
}
}
impl AddAssign for TimeDuration {
fn add_assign(&mut self, rhs: Self) {
*self = *self + rhs;
}
}
impl SubAssign for TimeDuration {
fn sub_assign(&mut self, rhs: Self) {
*self = *self - rhs;
}
}
// `std::time::Duration` has implementations of `Mul<u32>` and `Div<u32>`,
// plus checked methods and assign traits.
// It also has methods for division with floats,
// both `Duration -> Duration -> float` and `Duration -> float -> Duration`.
// We could provide some or all of these, but so far have not seen the need to.
impl From<TimeDuration> for AlgebraicValue {
fn from(value: TimeDuration) -> Self {
AlgebraicValue::product([value.to_micros().into()])
@@ -134,5 +179,39 @@ mod test {
prop_assert_eq!(time_duration_prime, time_duration);
prop_assert_eq!(time_duration_prime.to_micros(), micros);
}
#[test]
fn arithmetic_as_expected(lhs in any::<i64>(), rhs in any::<i64>()) {
let lhs_time_duration = TimeDuration::from_micros(lhs);
let rhs_time_duration = TimeDuration::from_micros(rhs);
if let Some(sum) = lhs.checked_add(rhs) {
let sum_time_duration = lhs_time_duration.checked_add(rhs_time_duration);
prop_assert!(sum_time_duration.is_some());
prop_assert_eq!(sum_time_duration.unwrap().to_micros(), sum);
prop_assert_eq!((lhs_time_duration + rhs_time_duration).to_micros(), sum);
let mut sum_assign = lhs_time_duration;
sum_assign += rhs_time_duration;
prop_assert_eq!(sum_assign.to_micros(), sum);
} else {
prop_assert!(lhs_time_duration.checked_add(rhs_time_duration).is_none());
}
if let Some(diff) = lhs.checked_sub(rhs) {
let diff_time_duration = lhs_time_duration.checked_sub(rhs_time_duration);
prop_assert!(diff_time_duration.is_some());
prop_assert_eq!(diff_time_duration.unwrap().to_micros(), diff);
prop_assert_eq!((lhs_time_duration - rhs_time_duration).to_micros(), diff);
let mut diff_assign = lhs_time_duration;
diff_assign -= rhs_time_duration;
prop_assert_eq!(diff_assign.to_micros(), diff);
} else {
prop_assert!(lhs_time_duration.checked_sub(rhs_time_duration).is_none());
}
}
}
}
+114 -8
View File
@@ -3,7 +3,7 @@ use chrono::DateTime;
use crate::{de::Deserialize, impl_st, ser::Serialize, time_duration::TimeDuration, AlgebraicType, AlgebraicValue};
use std::fmt;
use std::ops::Add;
use std::ops::{Add, AddAssign, Sub, SubAssign};
use std::time::{Duration, SystemTime};
#[derive(Eq, PartialEq, Ord, PartialOrd, Copy, Clone, Hash, Serialize, Deserialize, Debug)]
@@ -140,6 +140,37 @@ impl Timestamp {
.map(Timestamp::from_micros_since_unix_epoch)
}
/// Returns `Some(t)` where `t` is the time `self + duration` if `t` can be represented as a `Timestamp`,
/// i.e. a 64-bit signed number of microseconds before or after the Unix epoch.
pub fn checked_add(&self, duration: TimeDuration) -> Option<Self> {
self.__timestamp_micros_since_unix_epoch__
.checked_add(duration.to_micros())
.map(Timestamp::from_micros_since_unix_epoch)
}
/// Returns `Some(t)` where `t` is the time `self - duration` if `t` can be represented as a `Timestamp`,
/// i.e. a 64-bit signed number of microseconds before or after the Unix epoch.
pub fn checked_sub(&self, duration: TimeDuration) -> Option<Self> {
self.__timestamp_micros_since_unix_epoch__
.checked_sub(duration.to_micros())
.map(Timestamp::from_micros_since_unix_epoch)
}
/// Returns `Some(self + duration)`, or `None` if that value would be out-of-bounds for `Timestamp`.
///
/// Converts `duration` into a [`TimeDuration`] before the arithmetic.
/// Depending on the target platform's representation of [`Duration`], this may lose precision.
pub fn checked_add_duration(&self, duration: Duration) -> Option<Self> {
self.checked_add(TimeDuration::from_duration(duration))
}
/// Returns `Some(self - duration)`, or `None` if that value would be out-of-bounds for `Timestamp`.
///
/// Converts `duration` into a [`TimeDuration`] before the arithmetic.
/// Depending on the target platform's representation of [`Duration`], this may lose precision.
pub fn checked_sub_duration(&self, duration: Duration) -> Option<Self> {
self.checked_sub(TimeDuration::from_duration(duration))
}
/// Returns an RFC 3339 and ISO 8601 date and time string such as `1996-12-19T16:39:57-08:00`.
pub fn to_rfc3339(&self) -> anyhow::Result<String> {
DateTime::from_timestamp_micros(self.to_micros_since_unix_epoch())
@@ -153,7 +184,55 @@ impl Add<TimeDuration> for Timestamp {
type Output = Self;
fn add(self, other: TimeDuration) -> Self::Output {
Timestamp::from_micros_since_unix_epoch(self.to_micros_since_unix_epoch() + other.to_micros())
self.checked_add(other).unwrap()
}
}
impl Add<Duration> for Timestamp {
type Output = Self;
fn add(self, other: Duration) -> Self::Output {
self.checked_add_duration(other).unwrap()
}
}
impl Sub<TimeDuration> for Timestamp {
type Output = Self;
fn sub(self, other: TimeDuration) -> Self::Output {
self.checked_sub(other).unwrap()
}
}
impl Sub<Duration> for Timestamp {
type Output = Self;
fn sub(self, other: Duration) -> Self::Output {
self.checked_sub_duration(other).unwrap()
}
}
impl AddAssign<TimeDuration> for Timestamp {
fn add_assign(&mut self, other: TimeDuration) {
*self = *self + other;
}
}
impl AddAssign<Duration> for Timestamp {
fn add_assign(&mut self, other: Duration) {
*self = *self + other;
}
}
impl SubAssign<TimeDuration> for Timestamp {
fn sub_assign(&mut self, rhs: TimeDuration) {
*self = *self - rhs;
}
}
impl SubAssign<Duration> for Timestamp {
fn sub_assign(&mut self, rhs: Duration) {
*self = *self - rhs;
}
}
@@ -221,13 +300,40 @@ mod test {
}
#[test]
fn add_duration(since_epoch in any::<i64>().prop_map(|n| n.abs()), duration in any::<i64>()) {
prop_assume!(since_epoch.checked_add(duration).is_some());
fn arithmetic_with_timeduration(lhs in any::<i64>(), rhs in any::<i64>()) {
let lhs_timestamp = Timestamp::from_micros_since_unix_epoch(lhs);
let rhs_time_duration = TimeDuration::from_micros(rhs);
let timestamp = Timestamp::from_micros_since_unix_epoch(since_epoch);
let time_duration = TimeDuration::from_micros(duration);
let result = timestamp + time_duration;
prop_assert_eq!(result.to_micros_since_unix_epoch(), since_epoch + duration);
if let Some(sum) = lhs.checked_add(rhs) {
let sum_timestamp = lhs_timestamp.checked_add(rhs_time_duration);
prop_assert!(sum_timestamp.is_some());
prop_assert_eq!(sum_timestamp.unwrap().to_micros_since_unix_epoch(), sum);
prop_assert_eq!((lhs_timestamp + rhs_time_duration).to_micros_since_unix_epoch(), sum);
let mut sum_assign = lhs_timestamp;
sum_assign += rhs_time_duration;
prop_assert_eq!(sum_assign.to_micros_since_unix_epoch(), sum);
} else {
prop_assert!(lhs_timestamp.checked_add(rhs_time_duration).is_none());
}
if let Some(diff) = lhs.checked_sub(rhs) {
let diff_timestamp = lhs_timestamp.checked_sub(rhs_time_duration);
prop_assert!(diff_timestamp.is_some());
prop_assert_eq!(diff_timestamp.unwrap().to_micros_since_unix_epoch(), diff);
prop_assert_eq!((lhs_timestamp - rhs_time_duration).to_micros_since_unix_epoch(), diff);
let mut diff_assign = lhs_timestamp;
diff_assign -= rhs_time_duration;
prop_assert_eq!(diff_assign.to_micros_since_unix_epoch(), diff);
} else {
prop_assert!(lhs_timestamp.checked_sub(rhs_time_duration).is_none());
}
}
// TODO: determine what guarantees we provide for arithmetic with `Duration`,
// then write tests that we uphold said guarantees.
}
}
+4
View File
@@ -15,6 +15,10 @@ proc-macro = false # Set to `true` for a proc-macro library.
harness = true # Use libtest harness.
required-features = [] # Features required to build this target (N/A for lib)
[features]
# Perfmaps for profiling modules
perfmap = ["spacetimedb-core/perfmap"]
[dependencies]
spacetimedb-client-api-messages.workspace = true
spacetimedb-client-api.workspace = true
@@ -135,6 +135,7 @@ pub async fn exec(args: &ArgMatches) -> anyhow::Result<()> {
let data_dir = Arc::new(data_dir.clone());
let ctx = StandaloneEnv::init(db_config, &certs, data_dir).await?;
worker_metrics::spawn_jemalloc_stats(listen_addr.clone());
worker_metrics::spawn_tokio_stats(listen_addr.clone());
let mut db_routes = DatabaseRoutes::default();
db_routes.root_post = db_routes.root_post.layer(DefaultBodyLimit::disable());