finish delaqueue logic

This commit is contained in:
Shubham Mishra
2024-06-12 22:45:12 +05:30
parent 0a4111b983
commit d311525e45
7 changed files with 173 additions and 105 deletions
+10 -1
View File
@@ -106,7 +106,16 @@ pub async fn call<S: ControlStateDelegate + NodeDelegate>(
return Err((StatusCode::NOT_FOUND, format!("{:#}", anyhow::anyhow!(e))).into());
}
let result = match module
.call_reducer(caller_identity, Some(client_address), None, None, None, &reducer, args)
.call_reducer(
None,
caller_identity,
Some(client_address),
None,
None,
None,
&reducer,
args,
)
.await
{
Ok(rcr) => Ok(rcr),
@@ -233,6 +233,7 @@ impl ClientConnection {
) -> Result<ReducerCallResult, ReducerCallError> {
self.module
.call_reducer(
None,
self.id.identity,
Some(self.id.address),
Some(self.sender()),
+2 -1
View File
@@ -113,7 +113,8 @@ impl InstanceEnv {
args: Vec<u8>,
time: Timestamp,
) -> Result<ScheduledReducerId, ScheduleError> {
self.scheduler.schedule(reducer, args, time)
self.scheduler.schedule(reducer, args, time);
todo!()
}
#[tracing::instrument(skip_all)]
+8 -4
View File
@@ -391,7 +391,7 @@ pub trait ModuleInstance: Send + 'static {
fn update_database(&mut self, fence: u128) -> anyhow::Result<UpdateDatabaseResult>;
fn call_reducer(&mut self, params: CallReducerParams) -> ReducerCallResult;
fn call_reducer(&mut self, tx: Option<MutTxId>, params: CallReducerParams) -> ReducerCallResult;
}
pub struct CallReducerParams {
@@ -434,8 +434,8 @@ impl<T: Module> ModuleInstance for AutoReplacingModuleInstance<T> {
self.check_trap();
ret
}
fn call_reducer(&mut self, params: CallReducerParams) -> ReducerCallResult {
let ret = self.inst.call_reducer(params);
fn call_reducer(&mut self, tx: Option<MutTxId>, params: CallReducerParams) -> ReducerCallResult {
let ret = self.inst.call_reducer(tx, params);
self.check_trap();
ret
}
@@ -692,6 +692,7 @@ impl ModuleHost {
let result = self
.call_reducer_inner(
None,
caller_identity,
Some(caller_address),
None,
@@ -774,6 +775,7 @@ impl ModuleHost {
async fn call_reducer_inner(
&self,
tx: Option<MutTxId>,
caller_identity: Identity,
caller_address: Option<Address>,
client: Option<Arc<ClientConnectionSender>>,
@@ -792,7 +794,7 @@ impl ModuleHost {
let caller_address = caller_address.unwrap_or(Address::__DUMMY);
self.call(reducer_name, move |inst| {
inst.call_reducer(CallReducerParams {
inst.call_reducer(None, CallReducerParams {
timestamp: Timestamp::now(),
caller_identity,
caller_address,
@@ -809,6 +811,7 @@ impl ModuleHost {
pub async fn call_reducer(
&self,
tx: Option<MutTxId>,
caller_identity: Identity,
caller_address: Option<Address>,
client: Option<Arc<ClientConnectionSender>>,
@@ -822,6 +825,7 @@ impl ModuleHost {
}
let res = self
.call_reducer_inner(
tx,
caller_identity,
caller_address,
client,
+145 -95
View File
@@ -1,35 +1,34 @@
use std::sync::Arc;
use anyhow::{anyhow, Error};
use futures::StreamExt;
use rustc_hash::FxHashMap;
use sled::transaction::{ConflictableTransactionError::Abort as TxAbort, TransactionError};
use sled::transaction::TransactionError;
use spacetimedb_lib::bsatn::ser::BsatnError;
use spacetimedb_lib::bsatn::to_vec;
use spacetimedb_lib::de::Deserialize;
use spacetimedb_lib::scheduler::ScheduleAt;
use spacetimedb_lib::{bsatn, AlgebraicType, AlgebraicValue, ProductValue, SumType, Timestamp};
use spacetimedb_lib::{bsatn, AlgebraicValue, Timestamp};
use spacetimedb_primitives::TableId;
use spacetimedb_sats::algebraic_value::ser::ValueSerializer;
use spacetimedb_sats::satn::Satn;
use spacetimedb_sats::SumValue;
use spacetimedb_table::layout::PrimitiveType;
use sqlparser::ast::Interval;
use spacetimedb_table::table::RowRef;
use tokio::sync::mpsc;
use tokio::time::Instant;
use tokio_util::time::delay_queue::Expired;
use tokio_util::time::{delay_queue, DelayQueue};
use crate::db::datastore::locking_tx_datastore::state_view::StateView;
use crate::db::datastore::locking_tx_datastore::MutTxId;
use crate::db::datastore::system_tables::{StScheduledRow, ST_SCHEDULED_ID};
use crate::db::datastore::traits::IsolationLevel;
use crate::db::relational_db::RelationalDB;
use crate::error::DBError;
use crate::execution_context::ExecutionContext;
use crate::messages::control_worker_api::ScheduleUpdate;
use super::module_host::WeakModuleHost;
use super::{ModuleHost, ReducerArgs, ReducerCallError};
use super::{ModuleHost, ReducerArgs};
#[derive(Copy, Clone, Eq, PartialEq, Hash)]
pub struct ScheduledReducerId {
/// Scheduled Table id, this should have a entry in `ST_SCHEDULED`
table_id: TableId,
/// Refer to a particular schedule (row) in scheduled table
scheduled_id: u64,
}
@@ -43,10 +42,8 @@ enum SchedulerMessage {
Cancel { id: ScheduledReducerId },
}
#[derive(spacetimedb_sats::ser::Serialize, spacetimedb_sats::de::Deserialize)]
struct ScheduledReducer {
at: Timestamp,
reducer: String,
reducer: Box<str>,
bsatn_args: Vec<u8>,
}
@@ -82,6 +79,24 @@ impl Scheduler {
}
}
struct StModuleScheduledRow {
scheduled_id: u64,
scheduled_at: ScheduleAt,
}
impl TryFrom<RowRef<'_>> for StModuleScheduledRow {
type Error = Error;
fn try_from(row: RowRef<'_>) -> Result<Self, Self::Error> {
Ok(StModuleScheduledRow {
scheduled_id: row.read_col(0)?,
scheduled_at: row
.read_col::<AlgebraicValue>(1)?
.try_into()
.map_err(|_| anyhow!("Error reading scheduled_at"))?,
})
}
}
impl SchedulerStarter {
// TODO(cloutiertyler): This whole start dance is scuffed, but I don't have
// time to make it better right now.
@@ -93,29 +108,18 @@ impl SchedulerStarter {
for row in self.db.iter(&ctx, &tx, ST_SCHEDULED_ID)? {
let scheduled_table = StScheduledRow::try_from(row).expect("Error reading scheduled table row");
// Insert each entry (row) in DelayQueue
// Insert each entry (row) of scheduled tables in DelayQueue
for row_ref in self.db.iter(&ctx, &tx, scheduled_table.table_id)? {
// First two columns for schjedile table are fixed i.e `schedule_id` and
// `ScheuleAt` respectivelty
let scheduler: StModuleScheduledRow = row_ref.try_into()?;
let scheduled_id: u64 = row_ref
.read_col::<u64>(1)
.map_err(|_| anyhow::anyhow!("Error reading scheduled_at"))?;
let schedule_at: ScheduleAt = row_ref
.read_col::<AlgebraicValue>(2)
.map_err(|_| anyhow::anyhow!("Error reading scheduled_at"))?
.try_into()
.map_err(|_| anyhow::anyhow!("Error reading scheduled_at"))?;
let at_time = match schedule_at {
let at_time = match scheduler.scheduled_at {
ScheduleAt::Time(time) => time,
ScheduleAt::Interval(dur) => todo!(),
};
queue.insert(
ScheduledReducerId {
table_id: scheduled_table.table_id,
scheduled_id,
scheduled_id: scheduler.scheduled_id,
},
at_time.to_duration_from_now(),
);
@@ -176,12 +180,7 @@ pub enum ScheduleError {
}
impl Scheduler {
pub fn schedule(
&self,
reducer: String,
bsatn_args: Vec<u8>,
at: Timestamp,
) -> Result<ScheduledReducerId, ScheduleError> {
pub fn schedule(&self, table_id: TableId, row_id: u64, at: Timestamp) -> Result<ScheduledReducerId, ScheduleError> {
// Check that `at` is within `tokio_utils::time::DelayQueue`'s accepted time-range.
//
// `DelayQueue` uses a sliding window,
@@ -209,14 +208,9 @@ impl Scheduler {
return Err(ScheduleError::DelayTooLong(at));
}
let reducer = ScheduledReducer {
at,
reducer,
bsatn_args,
};
let id = ScheduledReducerId {
table_id: TableId::default(),
scheduled_id: 0,
table_id,
scheduled_id: row_id,
};
// if the actor has exited, it's fine to ignore; it means that the host actor calling
// schedule will exit soon as well, and it'll be scheduled to run when the module host restarts
@@ -225,21 +219,7 @@ impl Scheduler {
}
pub fn cancel(&self, id: ScheduledReducerId) {
// let res = self.db.transaction(|tx| {
// tx.remove(&id.0.to_le_bytes())?;
// Ok(())
// });
// match res {
// Ok(()) => {
// // if it's exited it's not gonna run it :) see also the comment in schedule()
// let _ = self.tx.send(MsgOrExit::Msg(SchedulerMessage::Cancel { id }));
// }
// // we could return an error here, but that would give them information that
// // there exists a scheduled reducer with this id. like returning a HTTP 400
// // instead of a 404
// Err(TransactionError::Abort(())) => {}
// Err(TransactionError::Storage(e)) => panic!("sled error: {e:?}"),
// }
let _ = self.tx.send(MsgOrExit::Msg(SchedulerMessage::Cancel { id }));
}
pub fn close(&self) {
@@ -286,43 +266,113 @@ impl SchedulerActor {
}
async fn handle_queued(&mut self, id: Expired<ScheduledReducerId>) {
// let id = id.into_inner();
// self.key_map.remove(&id);
// let Some(module_host) = self.module_host.upgrade() else {
// return;
// };
// let Some(scheduled) = self.db.get(id.0.to_le_bytes()).unwrap() else {
// return;
// };
// let scheduled: ScheduledReducer = bsatn::from_slice(&scheduled).unwrap();
let id = id.into_inner();
self.key_map.remove(&id);
// let db = self.db.clone();
// tokio::spawn(async move {
// let info = module_host.info();
// let identity = info.identity;
// // TODO: pass a logical "now" timestamp to this reducer call, but there's some
// // intricacies to get right (how much drift to tolerate? what kind of tokio::time::MissedTickBehavior do we want?)
// let res = module_host
// .call_reducer(
// identity,
// // Scheduled reducers take `None` as the caller address.
// None,
// None,
// None,
// None,
// &scheduled.reducer,
// ReducerArgs::Bsatn(scheduled.bsatn_args.into()),
// )
// .await;
// if !matches!(res, Err(ReducerCallError::NoSuchModule(_))) {
// // if we didn't actually call the reducer because the module exited, leave
// // the ScheduledReducer in the database for when the module restarts
// let _ = db.remove(id.0.to_le_bytes());
// }
// match res {
// Ok(_) => {}
// Err(e) => log::error!("invoking scheduled reducer failed: {e:#}"),
// }
// });
if let Some(module_host) = self.module_host.upgrade() {
let db = module_host.dbic().relational_db.clone();
let tx = db.begin_mut_tx(IsolationLevel::Serializable);
let identity = module_host.info().identity;
match self.proccess_schedule(id, &db, &tx) {
Ok((reducer, is_repeated)) => {
tokio::spawn(async move {
let res = module_host
.call_reducer(
Some(tx),
identity,
None,
None,
None,
None,
&reducer.reducer,
ReducerArgs::Bsatn(reducer.bsatn_args.into()),
)
.await;
// Delete the entry from table if it is not repeated schedule in new tx
if !is_repeated {
let _ = delete_schedule_from_table(id, &db);
}
if let Err(e) = res {
log::error!("invoking scheduled reducer failed: {e:#}");
};
});
}
Err(e) => {
log::error!("invoking scheduled reducer failed: {e:#}");
},
}
}
}
fn proccess_schedule(
&mut self,
id: ScheduledReducerId,
db: &RelationalDB,
tx: &MutTxId,
) -> Result<(ScheduledReducer, bool), anyhow::Error> {
let ctx = ExecutionContext::internal(db.address());
let scheduled_id = AlgebraicValue::from(id.scheduled_id);
let table_id = id.table_id;
// Find reducer name from `ST_SCHEDULED` table
let reducer_entry = db
.iter_by_col_eq_mut(&ctx, &tx, ST_SCHEDULED_ID, 0, &AlgebraicValue::from(table_id))?
.next()
.ok_or(anyhow!("scheduled table entry not exist in st_scheduled"))?;
let reducer_name = reducer_entry.read_col::<Box<str>>(0)?;
// Check if expired schedule exists in the relational_db
let schedule = db
.iter_by_col_eq_mut(&ctx, &tx, table_id, 0, &scheduled_id)?
.next()
.ok_or(anyhow!("scheduler not found in rdb"))?;
let schedule_at = TryInto::<StModuleScheduledRow>::try_into(schedule)?.scheduled_at;
let reducer_args_av = &schedule.to_product_value().elements[2..0];
let reducer_arg_bsatn = bsatn::to_vec(&reducer_args_av)?;
let mut is_repeated = false;
if let ScheduleAt::Interval(dur) = schedule_at {
is_repeated = true;
self.queue.insert(
ScheduledReducerId {
table_id,
scheduled_id: id.scheduled_id,
},
dur.into(),
);
}
Ok((
ScheduledReducer {
reducer: reducer_name,
bsatn_args: reducer_arg_bsatn,
},
is_repeated,
))
}
}
fn delete_schedule_from_table(id: ScheduledReducerId, db: &RelationalDB) -> Result<(), anyhow::Error> {
let mut tx = db.begin_mut_tx(IsolationLevel::Serializable);
let ctx = ExecutionContext::internal(db.address());
let scheduled_id = AlgebraicValue::from(id.scheduled_id);
let table_id = id.table_id;
let schedule_row = db
.iter_by_col_eq_mut(&ctx, &mut tx, table_id, 0, &scheduled_id)?
.next()
.ok_or(anyhow!("scheduler not found in rdb"))?;
let schedule_at = TryInto::<StModuleScheduledRow>::try_into(schedule_row)?.scheduled_at;
let row_ptr = schedule_row.pointer();
if let ScheduleAt::Time(_) = schedule_at {
db.delete(&mut tx, table_id, [row_ptr]);
}
tx.commit(&ctx);
Ok(())
}
@@ -467,8 +467,8 @@ impl<T: WasmInstance> ModuleInstance for WasmModuleInstance<T> {
}))
}
fn call_reducer(&mut self, params: CallReducerParams) -> ReducerCallResult {
crate::callgrind_flag::invoke_allowing_callgrind(|| self.call_reducer_with_tx(None, params))
fn call_reducer(&mut self, tx: Option<MutTxId>, params: CallReducerParams) -> ReducerCallResult {
crate::callgrind_flag::invoke_allowing_callgrind(|| self.call_reducer_with_tx(tx, params))
}
}
+5 -2
View File
@@ -3,6 +3,7 @@ use std::fmt::Debug;
use spacetimedb_bindings_macro::{Deserialize, Serialize};
use spacetimedb_sats::{
algebraic_value::de::{ValueDeserializeError, ValueDeserializer},
db::error::TypeError,
de::Deserialize as _,
impl_deserialize, impl_serialize, impl_st, AlgebraicType, AlgebraicValue,
};
@@ -17,9 +18,11 @@ impl Duration {
pub fn get_type() -> AlgebraicType {
AlgebraicType::U64
}
}
pub fn to_timestamp(&self, from: Timestamp) -> Timestamp {
Timestamp(self.0 + from.0)
impl From<Duration> for std::time::Duration {
fn from(value: Duration) -> Self {
Self::from_micros(value.0)
}
}