From d311525e455f7f7632b04e740edadb43172ae00a Mon Sep 17 00:00:00 2001 From: Shubham Mishra Date: Wed, 12 Jun 2024 22:45:12 +0530 Subject: [PATCH] finish delaqueue logic --- crates/client-api/src/routes/database.rs | 11 +- crates/core/src/client/client_connection.rs | 1 + crates/core/src/host/instance_env.rs | 3 +- crates/core/src/host/module_host.rs | 12 +- crates/core/src/host/scheduler.rs | 240 +++++++++++------- .../src/host/wasm_common/module_host_actor.rs | 4 +- crates/lib/src/scheduler.rs | 7 +- 7 files changed, 173 insertions(+), 105 deletions(-) diff --git a/crates/client-api/src/routes/database.rs b/crates/client-api/src/routes/database.rs index 06a04eb89..ec94f2cb9 100644 --- a/crates/client-api/src/routes/database.rs +++ b/crates/client-api/src/routes/database.rs @@ -106,7 +106,16 @@ pub async fn call( return Err((StatusCode::NOT_FOUND, format!("{:#}", anyhow::anyhow!(e))).into()); } let result = match module - .call_reducer(caller_identity, Some(client_address), None, None, None, &reducer, args) + .call_reducer( + None, + caller_identity, + Some(client_address), + None, + None, + None, + &reducer, + args, + ) .await { Ok(rcr) => Ok(rcr), diff --git a/crates/core/src/client/client_connection.rs b/crates/core/src/client/client_connection.rs index be5cdab40..7cc9321bf 100644 --- a/crates/core/src/client/client_connection.rs +++ b/crates/core/src/client/client_connection.rs @@ -233,6 +233,7 @@ impl ClientConnection { ) -> Result { self.module .call_reducer( + None, self.id.identity, Some(self.id.address), Some(self.sender()), diff --git a/crates/core/src/host/instance_env.rs b/crates/core/src/host/instance_env.rs index 2baa0eb66..7021017e7 100644 --- a/crates/core/src/host/instance_env.rs +++ b/crates/core/src/host/instance_env.rs @@ -113,7 +113,8 @@ impl InstanceEnv { args: Vec, time: Timestamp, ) -> Result { - self.scheduler.schedule(reducer, args, time) + self.scheduler.schedule(reducer, args, time); + todo!() } #[tracing::instrument(skip_all)] diff --git a/crates/core/src/host/module_host.rs b/crates/core/src/host/module_host.rs index 80b88a641..176651cc2 100644 --- a/crates/core/src/host/module_host.rs +++ b/crates/core/src/host/module_host.rs @@ -391,7 +391,7 @@ pub trait ModuleInstance: Send + 'static { fn update_database(&mut self, fence: u128) -> anyhow::Result; - fn call_reducer(&mut self, params: CallReducerParams) -> ReducerCallResult; + fn call_reducer(&mut self, tx: Option, params: CallReducerParams) -> ReducerCallResult; } pub struct CallReducerParams { @@ -434,8 +434,8 @@ impl ModuleInstance for AutoReplacingModuleInstance { self.check_trap(); ret } - fn call_reducer(&mut self, params: CallReducerParams) -> ReducerCallResult { - let ret = self.inst.call_reducer(params); + fn call_reducer(&mut self, tx: Option, params: CallReducerParams) -> ReducerCallResult { + let ret = self.inst.call_reducer(tx, params); self.check_trap(); ret } @@ -692,6 +692,7 @@ impl ModuleHost { let result = self .call_reducer_inner( + None, caller_identity, Some(caller_address), None, @@ -774,6 +775,7 @@ impl ModuleHost { async fn call_reducer_inner( &self, + tx: Option, caller_identity: Identity, caller_address: Option
, client: Option>, @@ -792,7 +794,7 @@ impl ModuleHost { let caller_address = caller_address.unwrap_or(Address::__DUMMY); self.call(reducer_name, move |inst| { - inst.call_reducer(CallReducerParams { + inst.call_reducer(None, CallReducerParams { timestamp: Timestamp::now(), caller_identity, caller_address, @@ -809,6 +811,7 @@ impl ModuleHost { pub async fn call_reducer( &self, + tx: Option, caller_identity: Identity, caller_address: Option
, client: Option>, @@ -822,6 +825,7 @@ impl ModuleHost { } let res = self .call_reducer_inner( + tx, caller_identity, caller_address, client, diff --git a/crates/core/src/host/scheduler.rs b/crates/core/src/host/scheduler.rs index 9b54d416f..44ba5280e 100644 --- a/crates/core/src/host/scheduler.rs +++ b/crates/core/src/host/scheduler.rs @@ -1,35 +1,34 @@ use std::sync::Arc; +use anyhow::{anyhow, Error}; use futures::StreamExt; use rustc_hash::FxHashMap; -use sled::transaction::{ConflictableTransactionError::Abort as TxAbort, TransactionError}; +use sled::transaction::TransactionError; use spacetimedb_lib::bsatn::ser::BsatnError; -use spacetimedb_lib::bsatn::to_vec; -use spacetimedb_lib::de::Deserialize; use spacetimedb_lib::scheduler::ScheduleAt; -use spacetimedb_lib::{bsatn, AlgebraicType, AlgebraicValue, ProductValue, SumType, Timestamp}; +use spacetimedb_lib::{bsatn, AlgebraicValue, Timestamp}; use spacetimedb_primitives::TableId; -use spacetimedb_sats::algebraic_value::ser::ValueSerializer; -use spacetimedb_sats::satn::Satn; -use spacetimedb_sats::SumValue; -use spacetimedb_table::layout::PrimitiveType; -use sqlparser::ast::Interval; +use spacetimedb_table::table::RowRef; use tokio::sync::mpsc; -use tokio::time::Instant; use tokio_util::time::delay_queue::Expired; use tokio_util::time::{delay_queue, DelayQueue}; -use crate::db::datastore::locking_tx_datastore::state_view::StateView; +use crate::db::datastore::locking_tx_datastore::MutTxId; use crate::db::datastore::system_tables::{StScheduledRow, ST_SCHEDULED_ID}; +use crate::db::datastore::traits::IsolationLevel; use crate::db::relational_db::RelationalDB; +use crate::error::DBError; use crate::execution_context::ExecutionContext; +use crate::messages::control_worker_api::ScheduleUpdate; use super::module_host::WeakModuleHost; -use super::{ModuleHost, ReducerArgs, ReducerCallError}; +use super::{ModuleHost, ReducerArgs}; #[derive(Copy, Clone, Eq, PartialEq, Hash)] pub struct ScheduledReducerId { + /// Scheduled Table id, this should have a entry in `ST_SCHEDULED` table_id: TableId, + /// Refer to a particular schedule (row) in scheduled table scheduled_id: u64, } @@ -43,10 +42,8 @@ enum SchedulerMessage { Cancel { id: ScheduledReducerId }, } -#[derive(spacetimedb_sats::ser::Serialize, spacetimedb_sats::de::Deserialize)] struct ScheduledReducer { - at: Timestamp, - reducer: String, + reducer: Box, bsatn_args: Vec, } @@ -82,6 +79,24 @@ impl Scheduler { } } +struct StModuleScheduledRow { + scheduled_id: u64, + scheduled_at: ScheduleAt, +} + +impl TryFrom> for StModuleScheduledRow { + type Error = Error; + fn try_from(row: RowRef<'_>) -> Result { + Ok(StModuleScheduledRow { + scheduled_id: row.read_col(0)?, + scheduled_at: row + .read_col::(1)? + .try_into() + .map_err(|_| anyhow!("Error reading scheduled_at"))?, + }) + } +} + impl SchedulerStarter { // TODO(cloutiertyler): This whole start dance is scuffed, but I don't have // time to make it better right now. @@ -93,29 +108,18 @@ impl SchedulerStarter { for row in self.db.iter(&ctx, &tx, ST_SCHEDULED_ID)? { let scheduled_table = StScheduledRow::try_from(row).expect("Error reading scheduled table row"); - // Insert each entry (row) in DelayQueue + // Insert each entry (row) of scheduled tables in DelayQueue for row_ref in self.db.iter(&ctx, &tx, scheduled_table.table_id)? { - // First two columns for schjedile table are fixed i.e `schedule_id` and - // `ScheuleAt` respectivelty + let scheduler: StModuleScheduledRow = row_ref.try_into()?; - let scheduled_id: u64 = row_ref - .read_col::(1) - .map_err(|_| anyhow::anyhow!("Error reading scheduled_at"))?; - - let schedule_at: ScheduleAt = row_ref - .read_col::(2) - .map_err(|_| anyhow::anyhow!("Error reading scheduled_at"))? - .try_into() - .map_err(|_| anyhow::anyhow!("Error reading scheduled_at"))?; - - let at_time = match schedule_at { + let at_time = match scheduler.scheduled_at { ScheduleAt::Time(time) => time, ScheduleAt::Interval(dur) => todo!(), }; queue.insert( ScheduledReducerId { table_id: scheduled_table.table_id, - scheduled_id, + scheduled_id: scheduler.scheduled_id, }, at_time.to_duration_from_now(), ); @@ -176,12 +180,7 @@ pub enum ScheduleError { } impl Scheduler { - pub fn schedule( - &self, - reducer: String, - bsatn_args: Vec, - at: Timestamp, - ) -> Result { + pub fn schedule(&self, table_id: TableId, row_id: u64, at: Timestamp) -> Result { // Check that `at` is within `tokio_utils::time::DelayQueue`'s accepted time-range. // // `DelayQueue` uses a sliding window, @@ -209,14 +208,9 @@ impl Scheduler { return Err(ScheduleError::DelayTooLong(at)); } - let reducer = ScheduledReducer { - at, - reducer, - bsatn_args, - }; let id = ScheduledReducerId { - table_id: TableId::default(), - scheduled_id: 0, + table_id, + scheduled_id: row_id, }; // if the actor has exited, it's fine to ignore; it means that the host actor calling // schedule will exit soon as well, and it'll be scheduled to run when the module host restarts @@ -225,21 +219,7 @@ impl Scheduler { } pub fn cancel(&self, id: ScheduledReducerId) { - // let res = self.db.transaction(|tx| { - // tx.remove(&id.0.to_le_bytes())?; - // Ok(()) - // }); - // match res { - // Ok(()) => { - // // if it's exited it's not gonna run it :) see also the comment in schedule() - // let _ = self.tx.send(MsgOrExit::Msg(SchedulerMessage::Cancel { id })); - // } - // // we could return an error here, but that would give them information that - // // there exists a scheduled reducer with this id. like returning a HTTP 400 - // // instead of a 404 - // Err(TransactionError::Abort(())) => {} - // Err(TransactionError::Storage(e)) => panic!("sled error: {e:?}"), - // } + let _ = self.tx.send(MsgOrExit::Msg(SchedulerMessage::Cancel { id })); } pub fn close(&self) { @@ -286,43 +266,113 @@ impl SchedulerActor { } async fn handle_queued(&mut self, id: Expired) { - // let id = id.into_inner(); - // self.key_map.remove(&id); - // let Some(module_host) = self.module_host.upgrade() else { - // return; - // }; - // let Some(scheduled) = self.db.get(id.0.to_le_bytes()).unwrap() else { - // return; - // }; - // let scheduled: ScheduledReducer = bsatn::from_slice(&scheduled).unwrap(); + let id = id.into_inner(); + self.key_map.remove(&id); - // let db = self.db.clone(); - // tokio::spawn(async move { - // let info = module_host.info(); - // let identity = info.identity; - // // TODO: pass a logical "now" timestamp to this reducer call, but there's some - // // intricacies to get right (how much drift to tolerate? what kind of tokio::time::MissedTickBehavior do we want?) - // let res = module_host - // .call_reducer( - // identity, - // // Scheduled reducers take `None` as the caller address. - // None, - // None, - // None, - // None, - // &scheduled.reducer, - // ReducerArgs::Bsatn(scheduled.bsatn_args.into()), - // ) - // .await; - // if !matches!(res, Err(ReducerCallError::NoSuchModule(_))) { - // // if we didn't actually call the reducer because the module exited, leave - // // the ScheduledReducer in the database for when the module restarts - // let _ = db.remove(id.0.to_le_bytes()); - // } - // match res { - // Ok(_) => {} - // Err(e) => log::error!("invoking scheduled reducer failed: {e:#}"), - // } - // }); + if let Some(module_host) = self.module_host.upgrade() { + let db = module_host.dbic().relational_db.clone(); + let tx = db.begin_mut_tx(IsolationLevel::Serializable); + let identity = module_host.info().identity; + match self.proccess_schedule(id, &db, &tx) { + Ok((reducer, is_repeated)) => { + tokio::spawn(async move { + let res = module_host + .call_reducer( + Some(tx), + identity, + None, + None, + None, + None, + &reducer.reducer, + ReducerArgs::Bsatn(reducer.bsatn_args.into()), + ) + .await; + + // Delete the entry from table if it is not repeated schedule in new tx + if !is_repeated { + let _ = delete_schedule_from_table(id, &db); + } + if let Err(e) = res { + log::error!("invoking scheduled reducer failed: {e:#}"); + }; + }); + } + Err(e) => { + log::error!("invoking scheduled reducer failed: {e:#}"); + }, + } + } + } + + fn proccess_schedule( + &mut self, + id: ScheduledReducerId, + db: &RelationalDB, + tx: &MutTxId, + ) -> Result<(ScheduledReducer, bool), anyhow::Error> { + let ctx = ExecutionContext::internal(db.address()); + + let scheduled_id = AlgebraicValue::from(id.scheduled_id); + let table_id = id.table_id; + + // Find reducer name from `ST_SCHEDULED` table + let reducer_entry = db + .iter_by_col_eq_mut(&ctx, &tx, ST_SCHEDULED_ID, 0, &AlgebraicValue::from(table_id))? + .next() + .ok_or(anyhow!("scheduled table entry not exist in st_scheduled"))?; + let reducer_name = reducer_entry.read_col::>(0)?; + + // Check if expired schedule exists in the relational_db + let schedule = db + .iter_by_col_eq_mut(&ctx, &tx, table_id, 0, &scheduled_id)? + .next() + .ok_or(anyhow!("scheduler not found in rdb"))?; + let schedule_at = TryInto::::try_into(schedule)?.scheduled_at; + + let reducer_args_av = &schedule.to_product_value().elements[2..0]; + let reducer_arg_bsatn = bsatn::to_vec(&reducer_args_av)?; + + let mut is_repeated = false; + if let ScheduleAt::Interval(dur) = schedule_at { + is_repeated = true; + self.queue.insert( + ScheduledReducerId { + table_id, + scheduled_id: id.scheduled_id, + }, + dur.into(), + ); + } + + Ok(( + ScheduledReducer { + reducer: reducer_name, + bsatn_args: reducer_arg_bsatn, + }, + is_repeated, + )) } } + +fn delete_schedule_from_table(id: ScheduledReducerId, db: &RelationalDB) -> Result<(), anyhow::Error> { + let mut tx = db.begin_mut_tx(IsolationLevel::Serializable); + let ctx = ExecutionContext::internal(db.address()); + + let scheduled_id = AlgebraicValue::from(id.scheduled_id); + let table_id = id.table_id; + + let schedule_row = db + .iter_by_col_eq_mut(&ctx, &mut tx, table_id, 0, &scheduled_id)? + .next() + .ok_or(anyhow!("scheduler not found in rdb"))?; + + let schedule_at = TryInto::::try_into(schedule_row)?.scheduled_at; + let row_ptr = schedule_row.pointer(); + + if let ScheduleAt::Time(_) = schedule_at { + db.delete(&mut tx, table_id, [row_ptr]); + } + tx.commit(&ctx); + Ok(()) +} diff --git a/crates/core/src/host/wasm_common/module_host_actor.rs b/crates/core/src/host/wasm_common/module_host_actor.rs index 09ba1c8cd..e58902aa7 100644 --- a/crates/core/src/host/wasm_common/module_host_actor.rs +++ b/crates/core/src/host/wasm_common/module_host_actor.rs @@ -467,8 +467,8 @@ impl ModuleInstance for WasmModuleInstance { })) } - fn call_reducer(&mut self, params: CallReducerParams) -> ReducerCallResult { - crate::callgrind_flag::invoke_allowing_callgrind(|| self.call_reducer_with_tx(None, params)) + fn call_reducer(&mut self, tx: Option, params: CallReducerParams) -> ReducerCallResult { + crate::callgrind_flag::invoke_allowing_callgrind(|| self.call_reducer_with_tx(tx, params)) } } diff --git a/crates/lib/src/scheduler.rs b/crates/lib/src/scheduler.rs index 6c487bb22..9257d5d59 100644 --- a/crates/lib/src/scheduler.rs +++ b/crates/lib/src/scheduler.rs @@ -3,6 +3,7 @@ use std::fmt::Debug; use spacetimedb_bindings_macro::{Deserialize, Serialize}; use spacetimedb_sats::{ algebraic_value::de::{ValueDeserializeError, ValueDeserializer}, + db::error::TypeError, de::Deserialize as _, impl_deserialize, impl_serialize, impl_st, AlgebraicType, AlgebraicValue, }; @@ -17,9 +18,11 @@ impl Duration { pub fn get_type() -> AlgebraicType { AlgebraicType::U64 } +} - pub fn to_timestamp(&self, from: Timestamp) -> Timestamp { - Timestamp(self.0 + from.0) +impl From for std::time::Duration { + fn from(value: Duration) -> Self { + Self::from_micros(value.0) } }