From 1963cd8e9869fa637e67bcacaa06100f789451a7 Mon Sep 17 00:00:00 2001 From: Noa Date: Thu, 17 Apr 2025 12:19:52 -0500 Subject: [PATCH] Monotonic timestamps (#2618) --- crates/core/src/host/instance_env.rs | 23 +- crates/core/src/host/scheduler.rs | 201 ++++++++++-------- .../src/host/wasmtime/wasm_instance_env.rs | 4 +- .../core/src/host/wasmtime/wasmtime_module.rs | 2 +- crates/lib/src/scheduler.rs | 21 +- crates/sats/src/time_duration.rs | 8 +- 6 files changed, 154 insertions(+), 105 deletions(-) diff --git a/crates/core/src/host/instance_env.rs b/crates/core/src/host/instance_env.rs index e6ce0ec2f0..3512fd6194 100644 --- a/crates/core/src/host/instance_env.rs +++ b/crates/core/src/host/instance_env.rs @@ -7,6 +7,7 @@ use crate::replica_context::ReplicaContext; use core::mem; use parking_lot::{Mutex, MutexGuard}; use smallvec::SmallVec; +use spacetimedb_lib::Timestamp; use spacetimedb_primitives::{ColId, ColList, IndexId, TableId}; use spacetimedb_sats::{ bsatn::{self, ToBsatn}, @@ -23,6 +24,8 @@ pub struct InstanceEnv { pub replica_ctx: Arc, pub scheduler: Scheduler, pub tx: TxSlot, + /// The timestamp the current reducer began running. + pub start_time: Timestamp, } #[derive(Clone, Default)] @@ -168,9 +171,15 @@ impl InstanceEnv { replica_ctx, scheduler, tx: TxSlot::default(), + start_time: Timestamp::now(), } } + /// Signal to this `InstanceEnv` that a reducer call is beginning. + pub fn start_reducer(&mut self, ts: Timestamp) { + self.start_time = ts; + } + fn get_tx(&self) -> Result + '_, GetTxError> { self.tx.get() } @@ -260,7 +269,14 @@ impl InstanceEnv { // as we successfully inserted and thus `ret` is verified against the table schema. .map_err(|e| NodesError::ScheduleError(ScheduleError::DecodingError(e)))?; self.scheduler - .schedule(table_id, schedule_id, schedule_at, id_column, at_column) + .schedule( + table_id, + schedule_id, + schedule_at, + id_column, + at_column, + self.start_time, + ) .map_err(NodesError::ScheduleError)?; Ok(()) @@ -490,6 +506,8 @@ impl From for NodesError { #[cfg(test)] mod test { + use super::*; + use std::{ops::Bound, sync::Arc}; use crate::{ @@ -515,8 +533,6 @@ mod test { use spacetimedb_sats::product; use tempfile::TempDir; - use super::{ChunkPool, InstanceEnv, TxSlot}; - /// An `InstanceEnv` requires a `DatabaseLogger` fn temp_logger() -> Result { let temp = TempDir::new()?; @@ -559,6 +575,7 @@ mod test { replica_ctx: Arc::new(replica_ctx(db)?), scheduler, tx: TxSlot::default(), + start_time: Timestamp::now(), }) } diff --git a/crates/core/src/host/scheduler.rs b/crates/core/src/host/scheduler.rs index 22be37bc5e..c3ca8cd919 100644 --- a/crates/core/src/host/scheduler.rs +++ b/crates/core/src/host/scheduler.rs @@ -12,8 +12,8 @@ use spacetimedb_primitives::{ColId, TableId}; use spacetimedb_sats::{bsatn::ToBsatn as _, AlgebraicValue}; use spacetimedb_table::table::RowRef; use tokio::sync::mpsc; -use tokio_util::time::delay_queue::Expired; -use tokio_util::time::{delay_queue, DelayQueue}; +use tokio::time::Instant; +use tokio_util::time::delay_queue::{self, DelayQueue, Expired}; use crate::db::datastore::locking_tx_datastore::MutTxId; use crate::db::datastore::system_tables::{StFields, StScheduledFields, ST_SCHEDULED_ID}; @@ -51,8 +51,17 @@ enum MsgOrExit { } enum SchedulerMessage { - Schedule { id: ScheduledReducerId, at: ScheduleAt }, - ScheduleImmediate { reducer_name: String, args: ReducerArgs }, + Schedule { + id: ScheduledReducerId, + /// The timestamp we'll tell the reducer it is. + effective_at: Timestamp, + /// The actual instant we're scheduling for. + real_at: Instant, + }, + ScheduleImmediate { + reducer_name: String, + args: ReducerArgs, + }, } pub struct ScheduledReducer { @@ -102,20 +111,22 @@ impl SchedulerStarter { .table_scheduled_id_and_at(&tx, table_id)? .ok_or_else(|| anyhow!("scheduled table {table_id} doesn't have valid columns"))?; + let now_ts = Timestamp::now(); + let now_instant = Instant::now(); + // Insert each entry (row) in the scheduled table into `queue`. for scheduled_row in self.db.iter(&tx, table_id)? { let (schedule_id, schedule_at) = get_schedule_from_row(&scheduled_row, id_column, at_column)?; // calculate duration left to call the scheduled reducer - let duration = schedule_at.to_duration_from_now(); - queue.insert( - QueueItem::Id(ScheduledReducerId { - table_id, - schedule_id, - id_column, - at_column, - }), - duration, - ); + let duration = schedule_at.to_duration_from(now_ts); + let at = schedule_at.to_timestamp_from(now_ts); + let id = ScheduledReducerId { + table_id, + schedule_id, + id_column, + at_column, + }; + queue.insert_at(QueueItem::Id { id, at }, now_instant + duration); } } @@ -158,7 +169,7 @@ impl SchedulerStarter { /// If `DelayQueue` extends to support a larger range, /// we may reject some long-delayed schedule calls which could succeed, /// but we will never permit a schedule attempt which will panic. -const MAX_SCHEDULE_DELAY: std::time::Duration = std::time::Duration::from_millis( +const MAX_SCHEDULE_DELAY: Duration = Duration::from_millis( // Equal to 64^6 - 1 milliseconds, which is 2.177589 years. (1 << (6 * 6)) - 1, ); @@ -173,6 +184,9 @@ pub enum ScheduleError { } impl Scheduler { + /// Schedule a reducer to run from a scheduled table. + /// + /// `reducer_start` is the timestamp of the start of the current reducer. pub(super) fn schedule( &self, table_id: TableId, @@ -180,33 +194,26 @@ impl Scheduler { schedule_at: ScheduleAt, id_column: ColId, at_column: ColId, + reducer_start: Timestamp, ) -> Result<(), ScheduleError> { - // Check that `at` is within `tokio_utils::time::DelayQueue`'s accepted time-range. + // if `Timestamp::now()` is properly monotonic, use it; otherwise, use + // the start of the reducer run as "now" for purposes of scheduling + let now = reducer_start.max(Timestamp::now()); + + // Check that `at` is within `tokio_utils::time::DelayQueue`'s + // accepted time-range. // - // `DelayQueue` uses a sliding window, - // and there may be some non-zero delay between this check - // and the actual call to `DelayQueue::insert`. + // `DelayQueue` uses a sliding window, and there may be some non-zero + // delay between this check and the actual call to `DelayQueue::insert_at`. // - // Assuming a monotonic clock, - // this means we may reject some otherwise acceptable schedule calls. - // - // If `Timestamp::now()`, i.e. `std::time::SystemTime::now()`, - // is not monotonic, - // `DelayQueue::insert` may panic. - // This will happen if a module attempts to schedule a reducer - // with a delay just before the two-year limit, - // and the system clock is adjusted backwards - // after the check but before scheduling so that after the adjustment, - // the delay is beyond the two-year limit. - // - // We could avoid this edge case by scheduling in terms of the monotonic `Instant`, - // rather than `SystemTime`, - // but we don't currently have a meaningful way - // to convert a `Timestamp` into an `Instant`. - let delay = schedule_at.to_duration_from_now(); + // Assuming a monotonic clock, this means we may reject some otherwise + // acceptable schedule calls. + let delay = schedule_at.to_duration_from(now); if delay >= MAX_SCHEDULE_DELAY { return Err(ScheduleError::DelayTooLong(delay)); } + let effective_at = schedule_at.to_timestamp_from(now); + let real_at = Instant::now() + delay; // if the actor has exited, it's fine to ignore; it means that the host actor calling // schedule will exit soon as well, and it'll be scheduled to run when the module host restarts @@ -217,7 +224,8 @@ impl Scheduler { id_column, at_column, }, - at: schedule_at, + effective_at, + real_at, })); Ok(()) @@ -247,10 +255,13 @@ struct SchedulerActor { } enum QueueItem { - Id(ScheduledReducerId), + Id { id: ScheduledReducerId, at: Timestamp }, VolatileNonatomicImmediate { reducer_name: String, args: ReducerArgs }, } +#[cfg(target_pointer_width = "64")] +spacetimedb_table::static_assert_size!(QueueItem, 64); + impl SchedulerActor { async fn run(mut self) { loop { @@ -270,12 +281,16 @@ impl SchedulerActor { fn handle_message(&mut self, msg: SchedulerMessage) { match msg { - SchedulerMessage::Schedule { id, at } => { + SchedulerMessage::Schedule { + id, + effective_at, + real_at, + } => { // Incase of row update, remove the existing entry from queue first if let Some(key) = self.key_map.get(&id) { self.queue.remove(key); } - let key = self.queue.insert(QueueItem::Id(id), at.to_duration_from_now()); + let key = self.queue.insert_at(QueueItem::Id { id, at: effective_at }, real_at); self.key_map.insert(id, key); } SchedulerMessage::ScheduleImmediate { reducer_name, args } => { @@ -290,7 +305,7 @@ impl SchedulerActor { async fn handle_queued(&mut self, id: Expired) { let item = id.into_inner(); let id = match item { - QueueItem::Id(id) => Some(id), + QueueItem::Id { id, .. } => Some(id), QueueItem::VolatileNonatomicImmediate { .. } => None, }; if let Some(id) = id { @@ -304,58 +319,60 @@ impl SchedulerActor { let caller_identity = module_host.info().database_identity; let module_info = module_host.info.clone(); - let call_reducer_params = move |tx: &MutTxId| -> Result, anyhow::Error> { - let id = match item { - QueueItem::Id(id) => id, - QueueItem::VolatileNonatomicImmediate { reducer_name, args } => { - let (reducer_id, reducer_seed) = module_info - .module_def - .reducer_arg_deserialize_seed(&reducer_name[..]) - .ok_or_else(|| anyhow!("Reducer not found: {}", reducer_name))?; - let reducer_args = args.into_tuple(reducer_seed)?; + let call_reducer_params = move |tx: &MutTxId| match item { + QueueItem::Id { id, at } => { + let Ok(schedule_row) = get_schedule_row_mut(tx, &db, id) else { + // if the row is not found, it means the schedule is cancelled by the user + log::debug!( + "table row corresponding to yeild scheduler id not found: tableid {}, schedulerId {}", + id.table_id, + id.schedule_id + ); + return Ok(None); + }; - return Ok(Some(CallReducerParams { - timestamp: Timestamp::now(), - caller_identity, - caller_connection_id: ConnectionId::ZERO, - client: None, - request_id: None, - timer: None, - reducer_id, - args: reducer_args, - })); - } - }; + let ScheduledReducer { reducer, bsatn_args } = proccess_schedule(tx, &db, id.table_id, &schedule_row)?; - let Ok(schedule_row) = get_schedule_row_mut(tx, &db, id) else { - // if the row is not found, it means the schedule is cancelled by the user - log::debug!( - "table row corresponding to yeild scheduler id not found: tableid {}, schedulerId {}", - id.table_id, - id.schedule_id - ); - return Ok(None); - }; + let (reducer_id, reducer_seed) = module_info + .module_def + .reducer_arg_deserialize_seed(&reducer[..]) + .ok_or_else(|| anyhow!("Reducer not found: {}", reducer))?; - let ScheduledReducer { reducer, bsatn_args } = proccess_schedule(tx, &db, id.table_id, &schedule_row)?; + let reducer_args = ReducerArgs::Bsatn(bsatn_args.into()).into_tuple(reducer_seed)?; - let (reducer_id, reducer_seed) = module_info - .module_def - .reducer_arg_deserialize_seed(&reducer[..]) - .ok_or_else(|| anyhow!("Reducer not found: {}", reducer))?; + // the timestamp we tell the reducer it's running at will be + // at least the timestamp it was scheduled to run at. + let timestamp = at.max(Timestamp::now()); - let reducer_args = ReducerArgs::Bsatn(bsatn_args.into()).into_tuple(reducer_seed)?; + Ok(Some(CallReducerParams { + timestamp, + caller_identity, + caller_connection_id: ConnectionId::ZERO, + client: None, + request_id: None, + timer: None, + reducer_id, + args: reducer_args, + })) + } + QueueItem::VolatileNonatomicImmediate { reducer_name, args } => { + let (reducer_id, reducer_seed) = module_info + .module_def + .reducer_arg_deserialize_seed(&reducer_name[..]) + .ok_or_else(|| anyhow!("Reducer not found: {}", reducer_name))?; + let reducer_args = args.into_tuple(reducer_seed)?; - Ok(Some(CallReducerParams { - timestamp: Timestamp::now(), - caller_identity, - caller_connection_id: ConnectionId::ZERO, - client: None, - request_id: None, - timer: None, - reducer_id, - args: reducer_args, - })) + Ok(Some(CallReducerParams { + timestamp: Timestamp::now(), + caller_identity, + caller_connection_id: ConnectionId::ZERO, + client: None, + request_id: None, + timer: None, + reducer_id, + args: reducer_args, + })) + } }; let db = module_host.replica_ctx().relational_db.clone(); @@ -391,9 +408,13 @@ impl SchedulerActor { let schedule_at = read_schedule_at(schedule_row, id.at_column)?; if let ScheduleAt::Interval(dur) = schedule_at { - let key = self - .queue - .insert(QueueItem::Id(id), dur.to_duration().unwrap_or(Duration::ZERO)); + let key = self.queue.insert( + QueueItem::Id { + id, + at: Timestamp::now() + dur, + }, + dur.to_duration().unwrap_or(Duration::ZERO), + ); self.key_map.insert(id, key); Ok(true) } else { diff --git a/crates/core/src/host/wasmtime/wasm_instance_env.rs b/crates/core/src/host/wasmtime/wasm_instance_env.rs index e923790731..045e1fa340 100644 --- a/crates/core/src/host/wasmtime/wasm_instance_env.rs +++ b/crates/core/src/host/wasmtime/wasm_instance_env.rs @@ -12,6 +12,7 @@ use crate::host::wasm_common::{ }; use crate::host::AbiCall; use anyhow::Context as _; +use spacetimedb_lib::Timestamp; use spacetimedb_primitives::{errno, ColId}; use wasmtime::{AsContext, Caller, StoreContextMut}; @@ -139,7 +140,7 @@ impl WasmInstanceEnv { /// /// Returns the handle used by reducers to read from `args` /// as well as the handle used to write the error message, if any. - pub fn start_reducer(&mut self, name: &str, args: bytes::Bytes) -> (u32, u32) { + pub fn start_reducer(&mut self, name: &str, args: bytes::Bytes, ts: Timestamp) -> (u32, u32) { let errors = self.setup_standard_bytes_sink(); // Pass an invalid source when the reducer args were empty. @@ -153,6 +154,7 @@ impl WasmInstanceEnv { self.reducer_start = Instant::now(); name.clone_into(&mut self.reducer_name); + self.instance_env.start_reducer(ts); (args, errors) } diff --git a/crates/core/src/host/wasmtime/wasmtime_module.rs b/crates/core/src/host/wasmtime/wasmtime_module.rs index c728177d63..552f797543 100644 --- a/crates/core/src/host/wasmtime/wasmtime_module.rs +++ b/crates/core/src/host/wasmtime/wasmtime_module.rs @@ -199,7 +199,7 @@ impl module_host_actor::WasmInstance for WasmtimeInstance { let [conn_id_0, conn_id_1] = bytemuck::must_cast(op.caller_connection_id.as_le_byte_array()); // Prepare arguments to the reducer + the error sink & start timings. - let (args_source, errors_sink) = store.data_mut().start_reducer(op.name, op.arg_bytes); + let (args_source, errors_sink) = store.data_mut().start_reducer(op.name, op.arg_bytes, op.timestamp); let call_result = self.call_reducer.call( &mut *store, diff --git a/crates/lib/src/scheduler.rs b/crates/lib/src/scheduler.rs index fa20dcab24..1ec0187f12 100644 --- a/crates/lib/src/scheduler.rs +++ b/crates/lib/src/scheduler.rs @@ -31,14 +31,10 @@ impl ScheduleAt { /// /// Returns [`std::time::Duration::ZERO`] if `self` represents a time in the past. #[cfg(not(all(target_arch = "wasm32", target_os = "unknown")))] - pub fn to_duration_from_now(&self) -> std::time::Duration { - use std::time::{Duration, SystemTime}; + pub fn to_duration_from(&self, from: Timestamp) -> std::time::Duration { + use std::time::Duration; match self { - ScheduleAt::Time(time) => { - let now = SystemTime::now(); - let time = SystemTime::from(*time); - time.duration_since(now).unwrap_or(Duration::ZERO) - } + ScheduleAt::Time(time) => time.duration_since(from).unwrap_or(Duration::ZERO), // TODO(correctness): Determine useful behavior on negative intervals, // as that's the case where `to_duration` fails. // Currently, we use the magnitude / absolute value, @@ -47,6 +43,17 @@ impl ScheduleAt { } } + /// Converts the `ScheduleAt` to a `Timestamp`. + /// + /// If `self` is an interval, returns `from + dur`. + #[cfg(not(all(target_arch = "wasm32", target_os = "unknown")))] + pub fn to_timestamp_from(&self, from: Timestamp) -> Timestamp { + match *self { + ScheduleAt::Time(time) => time, + ScheduleAt::Interval(dur) => from + dur.abs(), + } + } + /// Get the special `AlgebraicType` for `ScheduleAt`. pub fn get_type() -> AlgebraicType { AlgebraicType::sum([ diff --git a/crates/sats/src/time_duration.rs b/crates/sats/src/time_duration.rs index 430701ef9e..bd8d4543fb 100644 --- a/crates/sats/src/time_duration.rs +++ b/crates/sats/src/time_duration.rs @@ -31,9 +31,6 @@ impl TimeDuration { } /// Construct a [`TimeDuration`] which is `micros` microseconds. - /// - /// A positive value means a time after the Unix epoch, - /// and a negative value means a time before. pub fn from_micros(micros: i64) -> Self { Self { __time_duration_micros__: micros, @@ -59,6 +56,11 @@ impl TimeDuration { } } + /// Returns a positive `TimeDuration` with the magnitude of `self`. + pub fn abs(self) -> Self { + Self::from_micros(self.to_micros().saturating_abs()) + } + /// Return a [`TimeDuration`] which represents the same span as `duration`. /// /// Panics if `duration.as_micros` overflows an `i64`